JBoss hornetq SVN: r11016 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-22 09:52:26 -0400 (Fri, 22 Jul 2011)
New Revision: 11016
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
Close the session Factory at tearDown
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-22 02:38:58 UTC (rev 11015)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-22 13:52:26 UTC (rev 11016)
@@ -101,6 +101,7 @@
@Override
protected void tearDown() throws Exception
{
+ closeSessionFactory();
locator.close();
super.tearDown();
}
@@ -302,7 +303,6 @@
session.close();
- closeSessionFactory();
}
// https://jira.jboss.org/jira/browse/HORNETQ-285
@@ -334,8 +334,6 @@
receiveMessages(consumer);
session.close();
-
- closeSessionFactory();
}
public void testTransactedMessagesSentSoRollback() throws Exception
@@ -1356,7 +1354,6 @@
session.close();
- closeSessionFactory();
}
public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception
@@ -1504,6 +1501,8 @@
private void closeSessionFactory()
{
+ if (sf == null)
+ return;
sf.close();
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
@@ -1616,7 +1615,6 @@
session2.close();
- closeSessionFactory();
}
public void testBackupServerNotRemoved() throws Exception
@@ -1663,8 +1661,6 @@
producer.send(message);
session.close();
-
- closeSessionFactory();
}
public void testLiveAndBackupLiveComesBack() throws Exception
@@ -1713,7 +1709,6 @@
session.close();
- closeSessionFactory();
}
public void testLiveAndBackupLiveComesBackNewFactory() throws Exception
13 years, 5 months
JBoss hornetq SVN: r11015 - branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-21 22:38:58 -0400 (Thu, 21 Jul 2011)
New Revision: 11015
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
Log:
adding logs on test
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-07-22 02:38:25 UTC (rev 11014)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-07-22 02:38:58 UTC (rev 11015)
@@ -154,15 +154,15 @@
closeSessionFactory(1);
+ log.info("*********** Stopping server 1");
stopServers(1);
+ log.info("*********** Stopped server 1");
- Thread.sleep(12000);
+ Thread.sleep(1000);
System.out.println(clusterDescription(servers[0]));
startServers(1);
-
- Thread.sleep(3000);
System.out.println(clusterDescription(servers[0]));
System.out.println(clusterDescription(servers[1]));
13 years, 5 months
JBoss hornetq SVN: r11014 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-21 22:38:25 -0400 (Thu, 21 Jul 2011)
New Revision: 11014
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
fixing ServerLocator (wrong static)
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-21 01:27:25 UTC (rev 11013)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-22 02:38:25 UTC (rev 11014)
@@ -158,7 +158,7 @@
private static ExecutorService globalThreadPool;
- private static Executor startExecutor;
+ private Executor startExecutor;
private static ScheduledExecutorService globalScheduledThreadPool;
13 years, 5 months
JBoss hornetq SVN: r11013 - in branches/Branch_2_2_EAP_cluster_clean2: src/main/org/hornetq/core/server/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-20 21:27:25 -0400 (Wed, 20 Jul 2011)
New Revision: 11013
Added:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
HORNETQ-725, HORNETQ-744, HORNETQ-743 - a few paging fixes
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-07-21 01:16:45 UTC (rev 11012)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-07-21 01:27:25 UTC (rev 11013)
@@ -89,6 +89,10 @@
public synchronized PageSubscription createSubscription(long cursorID, Filter filter, boolean persistent)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this.pagingStore.getAddress() + " creating subscription " + cursorID + " with filter " + filter, new Exception ("trace"));
+ }
PageSubscription activeCursor = activeCursors.get(cursorID);
if (activeCursor != null)
{
@@ -330,6 +334,11 @@
{
return;
}
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Asserting cleanup for address " + this.pagingStore.getAddress());
+ }
ArrayList<PageSubscription> cursorList = new ArrayList<PageSubscription>();
cursorList.addAll(activeCursors.values());
@@ -344,9 +353,21 @@
{
if (!cursor.isComplete(minPage))
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Cursor " + cursor + " was considered incomplete at page " + minPage);
+ }
+
complete = false;
break;
}
+ else
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("Cursor " + cursor + "was considered **complete** at page " + minPage);
+ }
+ }
}
if (complete)
@@ -516,12 +537,21 @@
for (PageSubscription cursor : cursorList)
{
long firstPage = cursor.getFirstPage();
+ if (log.isDebugEnabled())
+ {
+ log.debug(this.pagingStore.getAddress() + " has a cursor " + cursor + " with first page=" + firstPage);
+ }
if (firstPage < minPage)
{
minPage = firstPage;
}
}
+ if (log.isDebugEnabled())
+ {
+ log.debug(this.pagingStore.getAddress() + " has minPage=" + minPage);
+ }
+
return minPage;
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-07-21 01:16:45 UTC (rev 11012)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-07-21 01:27:25 UTC (rev 11013)
@@ -308,7 +308,7 @@
@Override
public String toString()
{
- return "PageSubscriptionImpl [cursorId=" + cursorId + ", queue=" + queue + "]";
+ return "PageSubscriptionImpl [cursorId=" + cursorId + ", queue=" + queue + ", filter = " + filter + "]";
}
@@ -648,22 +648,42 @@
Collections.sort(recoveredACK);
boolean first = true;
+
+ long txDeleteCursorOnReload = -1;
for (PagePosition pos : recoveredACK)
{
lastAckedPosition = pos;
- PageCursorInfo positions = getPageInfo(pos);
- if (first)
+ PageCursorInfo pageInfo = getPageInfo(pos);
+
+ if (pageInfo == null)
{
- first = false;
- if (pos.getMessageNr() > 0)
+ log.warn("Couldn't find page cache for page " + pos + ", removing it from the journal");
+ if (txDeleteCursorOnReload == -1)
{
- positions.confirmed.addAndGet(pos.getMessageNr());
+ txDeleteCursorOnReload = store.generateUniqueID();
}
+ store.deleteCursorAcknowledgeTransactional(txDeleteCursorOnReload, pos.getRecordID());
+ }
+ else
+ {
+ if (first)
+ {
+ first = false;
+ if (pos.getMessageNr() > 0)
+ {
+ pageInfo.confirmed.addAndGet(pos.getMessageNr());
+ }
+ }
+
+ pageInfo.addACK(pos);
}
-
- positions.addACK(pos);
}
+
+ if (txDeleteCursorOnReload >= 0)
+ {
+ store.commit(txDeleteCursorOnReload);
+ }
recoveredACK.clear();
recoveredACK = null;
@@ -723,6 +743,10 @@
if (create && pageInfo == null)
{
PageCache cache = cursorProvider.getPageCache(pos);
+ if (cache == null)
+ {
+ return null;
+ }
pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages(), cache);
consumedPages.put(pos.getPageNr(), pageInfo);
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-21 01:16:45 UTC (rev 11012)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-21 01:27:25 UTC (rev 11013)
@@ -74,6 +74,7 @@
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.postoffice.QueueBinding;
import org.hornetq.core.postoffice.impl.DivertBinding;
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
@@ -1625,28 +1626,31 @@
{
queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo);
- Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
-
- PageSubscription subscription = pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createSubscription(queueBindingInfo.getId(), filter, true);
+ if (queueBindingInfo.getFilterString() == null || !queueBindingInfo.getFilterString().toString().equals(GENERIC_IGNORED_FILTER))
+ {
+ Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
+
+ PageSubscription subscription = pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createSubscription(queueBindingInfo.getId(), filter, true);
+
+ Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
+ queueBindingInfo.getAddress(),
+ queueBindingInfo.getQueueName(),
+ filter,
+ subscription,
+ true,
+ false);
+
+ Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId());
+
+ queues.put(queueBindingInfo.getId(), queue);
+
+ postOffice.addBinding(binding);
+
+ managementService.registerAddress(queueBindingInfo.getAddress());
+ managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
+ }
- Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
- queueBindingInfo.getAddress(),
- queueBindingInfo.getQueueName(),
- filter,
- subscription,
- true,
- false);
-
- Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId());
-
- queues.put(queueBindingInfo.getId(), queue);
-
- postOffice.addBinding(binding);
-
- managementService.registerAddress(queueBindingInfo.getAddress());
- managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
-
}
for (GroupingInfo groupingInfo : groupingInfos)
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-07-21 01:16:45 UTC (rev 11012)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-07-21 01:27:25 UTC (rev 11013)
@@ -265,7 +265,7 @@
// If updateDeliveries = false (set by strict-update),
// the updateDeliveryCount would still be updated after c
- if (strictUpdateDeliveryCount)
+ if (strictUpdateDeliveryCount && !ref.isPaged())
{
if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && !ref.getQueue().isInternalQueue() && !ref.isPaged())
{
Added: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java (rev 0)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java 2011-07-21 01:27:25 UTC (rev 11013)
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.JMSTestBase;
+
+public class JMSPagingFileDeleteTest extends JMSTestBase
+{
+ static Logger log = Logger.getLogger(JMSPagingFileDeleteTest.class);
+
+ Topic topic1;
+
+ Connection connection;
+
+ Session session;
+
+ MessageConsumer subscriber1;
+
+ MessageConsumer subscriber2;
+
+ PagingStore pagingStore;
+
+ private static final int MESSAGE_SIZE = 1024;
+
+ private static final int PAGE_SIZE = 10 * 1024;
+
+ private static final int PAGE_MAX = 20 * 1024;
+
+ private static final int RECEIVE_TIMEOUT = 500;
+
+ private static final int MESSAGE_NUM = 50;
+
+ @Override
+ protected boolean usePersistence()
+ {
+ return true;
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ clearData();
+ super.setUp();
+
+ topic1 = createTopic("topic1");
+
+ // Paging Setting
+ AddressSettings setting = new AddressSettings();
+ setting.setPageSizeBytes(JMSPagingFileDeleteTest.PAGE_SIZE);
+ setting.setMaxSizeBytes(JMSPagingFileDeleteTest.PAGE_MAX);
+ server.getAddressSettingsRepository().addMatch("#", setting);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ topic1 = null;
+ super.tearDown();
+ }
+
+ public void testTopics() throws Exception
+ {
+ connection = null;
+
+ try
+ {
+ connection = cf.createConnection();
+ connection.setClientID("cid");
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(topic1);
+ subscriber1 = session.createDurableSubscriber(topic1, "subscriber-1");
+ subscriber2 = session.createDurableSubscriber(topic1, "subscriber-2");
+
+ // -----------------(Step1) Publish Messages to make Paging Files. --------------------
+ System.out.println("---------- Send messages. ----------");
+ BytesMessage bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes(new byte[JMSPagingFileDeleteTest.MESSAGE_SIZE]);
+ for (int i = 0; i < JMSPagingFileDeleteTest.MESSAGE_NUM; i++)
+ {
+ producer.send(bytesMessage);
+ }
+ System.out.println("Sent " + JMSPagingFileDeleteTest.MESSAGE_NUM + " messages.");
+
+ pagingStore = server.getPagingManager().getPageStore(new SimpleString("jms.topic.topic1"));
+ printPageStoreInfo(pagingStore);
+
+ assertTrue(pagingStore.isPaging());
+
+ connection.start();
+
+ // -----------------(Step2) Restart the server. --------------------------------------
+ stopAndStartServer(); // If try this test without restarting server, please comment out this line;
+
+ // -----------------(Step3) Subscribe to all the messages from the topic.--------------
+ System.out.println("---------- Receive all messages. ----------");
+ for (int i = 0; i < JMSPagingFileDeleteTest.MESSAGE_NUM; i++)
+ {
+ Message message1 = subscriber1.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT);
+ assertNotNull(message1);
+ Message message2 = subscriber2.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT);
+ assertNotNull(message2);
+ }
+
+ pagingStore = server.getPagingManager().getPageStore(new SimpleString("jms.topic.topic1"));
+ long timeout = System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() && pagingStore.isPaging())
+ {
+ Thread.sleep(100);
+ }
+ assertFalse(pagingStore.isPaging());
+
+ printPageStoreInfo(pagingStore);
+
+ assertEquals(0, pagingStore.getAddressSize());
+ // assertEquals(1, pagingStore.getNumberOfPages()); //I expected number of the page is 1, but It was not.
+ assertFalse(pagingStore.isPaging()); // I expected IsPaging is false, but It was true.
+ // If the server is not restart, this test pass.
+
+ // -----------------(Step4) Publish a message. the message is stored in the paging file.
+ producer = session.createProducer(topic1);
+ bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes(new byte[JMSPagingFileDeleteTest.MESSAGE_SIZE]);
+ producer.send(bytesMessage);
+
+ printPageStoreInfo(pagingStore);
+
+ assertEquals(1, pagingStore.getNumberOfPages()); //I expected number of the page is 1, but It was not.
+ }
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ private void stopAndStartServer() throws Exception
+ {
+ System.out.println("---------- Restart server. ----------");
+ connection.close();
+
+ jmsServer.stop();
+
+ jmsServer.start();
+ jmsServer.activated();
+ registerConnectionFactory();
+
+ printPageStoreInfo(pagingStore);
+ reconnect();
+ }
+
+ private void reconnect() throws Exception
+ {
+ connection = cf.createConnection();
+ connection.setClientID("cid");
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ subscriber1 = session.createDurableSubscriber(topic1, "subscriber-1");
+ subscriber2 = session.createDurableSubscriber(topic1, "subscriber-2");
+ connection.start();
+ }
+
+ private void printPageStoreInfo(PagingStore pagingStore) throws Exception
+ {
+ System.out.println("---------- Paging Store Info ----------");
+ System.out.println(" CurrentPage = " + pagingStore.getCurrentPage());
+ System.out.println(" FirstPage = " + pagingStore.getFirstPage());
+ System.out.println(" Number of Pages = " + pagingStore.getNumberOfPages());
+ System.out.println(" Address Size = " + pagingStore.getAddressSize());
+ System.out.println(" Is Paging = " + pagingStore.isPaging());
+ }
+}
\ No newline at end of file
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-07-21 01:16:45 UTC (rev 11012)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-07-21 01:27:25 UTC (rev 11013)
@@ -13,6 +13,7 @@
package org.hornetq.tests.integration.client;
+import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@@ -497,6 +498,248 @@
}
+ /**
+ * This test will remove all the page directories during a restart, simulating a crash scenario. The server should still start after this
+ */
+ public void testDeletePhisicalPages() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+ config.setPersistDeliveryCountBeforeDelivery(true);
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 1000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ session.close();
+
+ session = null;
+
+ sf.close();
+ locator.close();
+
+ server.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+
+ Queue queue = server.locateQueue(ADDRESS);
+
+ assertEquals(numberOfMessages, queue.getMessageCount());
+
+ LinkedList<Xid> xids = new LinkedList<Xid>();
+
+ int msgReceived = 0;
+ ClientSession sessionConsumer = sf.createSession(false, false, false);
+ sessionConsumer.start();
+ ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
+ for (int msgCount = 0; msgCount < numberOfMessages; msgCount++)
+ {
+ log.info("Received " + msgCount);
+ msgReceived++;
+ ClientMessage msg = consumer.receiveImmediate();
+ if (msg == null)
+ {
+ log.info("It's null. leaving now");
+ sessionConsumer.commit();
+ fail("Didn't receive a message");
+ }
+ msg.acknowledge();
+
+ if (msgCount % 5 == 0)
+ {
+ log.info("commit");
+ sessionConsumer.commit();
+ }
+ }
+
+ sessionConsumer.commit();
+
+ sessionConsumer.close();
+
+ sf.close();
+
+ locator.close();
+
+ assertEquals(0, queue.getMessageCount());
+
+ long timeout = System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
+ {
+ Thread.sleep(100);
+ }
+ assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
+
+ server.stop();
+
+ // Deleting the paging data. Simulating a failure
+ // a dumb user, or anything that will remove the data
+ deleteDirectory(new File(getPageDir()));
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+
+ locator = createInVMNonHALocator();
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ sf = locator.createSessionFactory();
+
+ queue = server.locateQueue(ADDRESS);
+
+ sf = locator.createSessionFactory();
+ session = sf.createSession(false, false, false);
+
+ producer = session.createProducer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ server.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+
+ queue = server.locateQueue(ADDRESS);
+
+ // assertEquals(numberOfMessages, queue.getMessageCount());
+
+ xids = new LinkedList<Xid>();
+
+ msgReceived = 0;
+ sessionConsumer = sf.createSession(false, false, false);
+ sessionConsumer.start();
+ consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
+ for (int msgCount = 0; msgCount < numberOfMessages; msgCount++)
+ {
+ log.info("Received " + msgCount);
+ msgReceived++;
+ ClientMessage msg = consumer.receiveImmediate();
+ if (msg == null)
+ {
+ log.info("It's null. leaving now");
+ sessionConsumer.commit();
+ fail("Didn't receive a message");
+ }
+ msg.acknowledge();
+
+ if (msgCount % 5 == 0)
+ {
+ log.info("commit");
+ sessionConsumer.commit();
+ }
+ }
+
+ sessionConsumer.commit();
+
+ sessionConsumer.close();
+
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testMissingTXEverythingAcked() throws Exception
{
clearData();
@@ -1281,7 +1524,7 @@
sf.close();
locator.close();
}
-
+
server = createServer(true,
config,
PagingTest.PAGE_SIZE,
@@ -1353,7 +1596,6 @@
t.start();
t.join();
-
assertEquals(0, errors.get());
for (int i = 0; i < 20 && server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging(); i++)
@@ -1361,9 +1603,8 @@
// The delete may be asynchronous, giving some time case it eventually happen asynchronously
Thread.sleep(500);
}
-
- assertFalse (server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging());
+ assertFalse(server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging());
for (int i = 0; i < 20 && server.getPostOffice().getPagingManager().getTransactions().size() != 0; i++)
{
@@ -1372,7 +1613,6 @@
}
assertEquals(0, server.getPostOffice().getPagingManager().getTransactions().size());
-
}
finally
13 years, 5 months
JBoss hornetq SVN: r11012 - in branches/Branch_2_2_AS7: src/main/org/hornetq/core/server/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-20 21:16:45 -0400 (Wed, 20 Jul 2011)
New Revision: 11012
Added:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
HORNETQ-725, HORNETQ-744, HORNETQ-743 - a few paging fixes
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-07-21 01:04:51 UTC (rev 11011)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-07-21 01:16:45 UTC (rev 11012)
@@ -89,6 +89,10 @@
public synchronized PageSubscription createSubscription(long cursorID, Filter filter, boolean persistent)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this.pagingStore.getAddress() + " creating subscription " + cursorID + " with filter " + filter, new Exception ("trace"));
+ }
PageSubscription activeCursor = activeCursors.get(cursorID);
if (activeCursor != null)
{
@@ -330,6 +334,11 @@
{
return;
}
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Asserting cleanup for address " + this.pagingStore.getAddress());
+ }
ArrayList<PageSubscription> cursorList = new ArrayList<PageSubscription>();
cursorList.addAll(activeCursors.values());
@@ -344,9 +353,21 @@
{
if (!cursor.isComplete(minPage))
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Cursor " + cursor + " was considered incomplete at page " + minPage);
+ }
+
complete = false;
break;
}
+ else
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("Cursor " + cursor + "was considered **complete** at page " + minPage);
+ }
+ }
}
if (complete)
@@ -516,12 +537,21 @@
for (PageSubscription cursor : cursorList)
{
long firstPage = cursor.getFirstPage();
+ if (log.isDebugEnabled())
+ {
+ log.debug(this.pagingStore.getAddress() + " has a cursor " + cursor + " with first page=" + firstPage);
+ }
if (firstPage < minPage)
{
minPage = firstPage;
}
}
+ if (log.isDebugEnabled())
+ {
+ log.debug(this.pagingStore.getAddress() + " has minPage=" + minPage);
+ }
+
return minPage;
}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-07-21 01:04:51 UTC (rev 11011)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-07-21 01:16:45 UTC (rev 11012)
@@ -308,7 +308,7 @@
@Override
public String toString()
{
- return "PageSubscriptionImpl [cursorId=" + cursorId + ", queue=" + queue + "]";
+ return "PageSubscriptionImpl [cursorId=" + cursorId + ", queue=" + queue + ", filter = " + filter + "]";
}
@@ -648,22 +648,42 @@
Collections.sort(recoveredACK);
boolean first = true;
+
+ long txDeleteCursorOnReload = -1;
for (PagePosition pos : recoveredACK)
{
lastAckedPosition = pos;
- PageCursorInfo positions = getPageInfo(pos);
- if (first)
+ PageCursorInfo pageInfo = getPageInfo(pos);
+
+ if (pageInfo == null)
{
- first = false;
- if (pos.getMessageNr() > 0)
+ log.warn("Couldn't find page cache for page " + pos + ", removing it from the journal");
+ if (txDeleteCursorOnReload == -1)
{
- positions.confirmed.addAndGet(pos.getMessageNr());
+ txDeleteCursorOnReload = store.generateUniqueID();
}
+ store.deleteCursorAcknowledgeTransactional(txDeleteCursorOnReload, pos.getRecordID());
+ }
+ else
+ {
+ if (first)
+ {
+ first = false;
+ if (pos.getMessageNr() > 0)
+ {
+ pageInfo.confirmed.addAndGet(pos.getMessageNr());
+ }
+ }
+
+ pageInfo.addACK(pos);
}
-
- positions.addACK(pos);
}
+
+ if (txDeleteCursorOnReload >= 0)
+ {
+ store.commit(txDeleteCursorOnReload);
+ }
recoveredACK.clear();
recoveredACK = null;
@@ -723,6 +743,10 @@
if (create && pageInfo == null)
{
PageCache cache = cursorProvider.getPageCache(pos);
+ if (cache == null)
+ {
+ return null;
+ }
pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages(), cache);
consumedPages.put(pos.getPageNr(), pageInfo);
}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-21 01:04:51 UTC (rev 11011)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-21 01:16:45 UTC (rev 11012)
@@ -1582,28 +1582,31 @@
{
queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo);
- Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
-
- PageSubscription subscription = pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createSubscription(queueBindingInfo.getId(), filter, true);
+ if (queueBindingInfo.getFilterString() == null || !queueBindingInfo.getFilterString().toString().equals(GENERIC_IGNORED_FILTER))
+ {
+ Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
+
+ PageSubscription subscription = pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createSubscription(queueBindingInfo.getId(), filter, true);
+
+ Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
+ queueBindingInfo.getAddress(),
+ queueBindingInfo.getQueueName(),
+ filter,
+ subscription,
+ true,
+ false);
+
+ Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId());
+
+ queues.put(queueBindingInfo.getId(), queue);
+
+ postOffice.addBinding(binding);
+
+ managementService.registerAddress(queueBindingInfo.getAddress());
+ managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
+ }
- Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
- queueBindingInfo.getAddress(),
- queueBindingInfo.getQueueName(),
- filter,
- subscription,
- true,
- false);
-
- Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId());
-
- queues.put(queueBindingInfo.getId(), queue);
-
- postOffice.addBinding(binding);
-
- managementService.registerAddress(queueBindingInfo.getAddress());
- managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
-
}
for (GroupingInfo groupingInfo : groupingInfos)
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-07-21 01:04:51 UTC (rev 11011)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-07-21 01:16:45 UTC (rev 11012)
@@ -260,7 +260,7 @@
// If updateDeliveries = false (set by strict-update),
// the updateDeliveryCount would still be updated after c
- if (strictUpdateDeliveryCount)
+ if (strictUpdateDeliveryCount && !ref.isPaged())
{
if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && !ref.getQueue().isInternalQueue())
{
Added: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java (rev 0)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java 2011-07-21 01:16:45 UTC (rev 11012)
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.JMSTestBase;
+
+public class JMSPagingFileDeleteTest extends JMSTestBase
+{
+ static Logger log = Logger.getLogger(JMSPagingFileDeleteTest.class);
+
+ Topic topic1;
+
+ Connection connection;
+
+ Session session;
+
+ MessageConsumer subscriber1;
+
+ MessageConsumer subscriber2;
+
+ PagingStore pagingStore;
+
+ private static final int MESSAGE_SIZE = 1024;
+
+ private static final int PAGE_SIZE = 10 * 1024;
+
+ private static final int PAGE_MAX = 20 * 1024;
+
+ private static final int RECEIVE_TIMEOUT = 500;
+
+ private static final int MESSAGE_NUM = 50;
+
+ @Override
+ protected boolean usePersistence()
+ {
+ return true;
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ clearData();
+ super.setUp();
+
+ topic1 = createTopic("topic1");
+
+ // Paging Setting
+ AddressSettings setting = new AddressSettings();
+ setting.setPageSizeBytes(JMSPagingFileDeleteTest.PAGE_SIZE);
+ setting.setMaxSizeBytes(JMSPagingFileDeleteTest.PAGE_MAX);
+ server.getAddressSettingsRepository().addMatch("#", setting);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ topic1 = null;
+ super.tearDown();
+ }
+
+ public void testTopics() throws Exception
+ {
+ connection = null;
+
+ try
+ {
+ connection = cf.createConnection();
+ connection.setClientID("cid");
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(topic1);
+ subscriber1 = session.createDurableSubscriber(topic1, "subscriber-1");
+ subscriber2 = session.createDurableSubscriber(topic1, "subscriber-2");
+
+ // -----------------(Step1) Publish Messages to make Paging Files. --------------------
+ System.out.println("---------- Send messages. ----------");
+ BytesMessage bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes(new byte[JMSPagingFileDeleteTest.MESSAGE_SIZE]);
+ for (int i = 0; i < JMSPagingFileDeleteTest.MESSAGE_NUM; i++)
+ {
+ producer.send(bytesMessage);
+ }
+ System.out.println("Sent " + JMSPagingFileDeleteTest.MESSAGE_NUM + " messages.");
+
+ pagingStore = server.getPagingManager().getPageStore(new SimpleString("jms.topic.topic1"));
+ printPageStoreInfo(pagingStore);
+
+ assertTrue(pagingStore.isPaging());
+
+ connection.start();
+
+ // -----------------(Step2) Restart the server. --------------------------------------
+ stopAndStartServer(); // If try this test without restarting server, please comment out this line;
+
+ // -----------------(Step3) Subscribe to all the messages from the topic.--------------
+ System.out.println("---------- Receive all messages. ----------");
+ for (int i = 0; i < JMSPagingFileDeleteTest.MESSAGE_NUM; i++)
+ {
+ Message message1 = subscriber1.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT);
+ assertNotNull(message1);
+ Message message2 = subscriber2.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT);
+ assertNotNull(message2);
+ }
+
+ pagingStore = server.getPagingManager().getPageStore(new SimpleString("jms.topic.topic1"));
+ long timeout = System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() && pagingStore.isPaging())
+ {
+ Thread.sleep(100);
+ }
+ assertFalse(pagingStore.isPaging());
+
+ printPageStoreInfo(pagingStore);
+
+ assertEquals(0, pagingStore.getAddressSize());
+ // assertEquals(1, pagingStore.getNumberOfPages()); //I expected number of the page is 1, but It was not.
+ assertFalse(pagingStore.isPaging()); // I expected IsPaging is false, but It was true.
+ // If the server is not restart, this test pass.
+
+ // -----------------(Step4) Publish a message. the message is stored in the paging file.
+ producer = session.createProducer(topic1);
+ bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes(new byte[JMSPagingFileDeleteTest.MESSAGE_SIZE]);
+ producer.send(bytesMessage);
+
+ printPageStoreInfo(pagingStore);
+
+ assertEquals(1, pagingStore.getNumberOfPages()); //I expected number of the page is 1, but It was not.
+ }
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ private void stopAndStartServer() throws Exception
+ {
+ System.out.println("---------- Restart server. ----------");
+ connection.close();
+
+ jmsServer.stop();
+
+ jmsServer.start();
+ jmsServer.activated();
+ registerConnectionFactory();
+
+ printPageStoreInfo(pagingStore);
+ reconnect();
+ }
+
+ private void reconnect() throws Exception
+ {
+ connection = cf.createConnection();
+ connection.setClientID("cid");
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ subscriber1 = session.createDurableSubscriber(topic1, "subscriber-1");
+ subscriber2 = session.createDurableSubscriber(topic1, "subscriber-2");
+ connection.start();
+ }
+
+ private void printPageStoreInfo(PagingStore pagingStore) throws Exception
+ {
+ System.out.println("---------- Paging Store Info ----------");
+ System.out.println(" CurrentPage = " + pagingStore.getCurrentPage());
+ System.out.println(" FirstPage = " + pagingStore.getFirstPage());
+ System.out.println(" Number of Pages = " + pagingStore.getNumberOfPages());
+ System.out.println(" Address Size = " + pagingStore.getAddressSize());
+ System.out.println(" Is Paging = " + pagingStore.isPaging());
+ }
+}
\ No newline at end of file
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-07-21 01:04:51 UTC (rev 11011)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-07-21 01:16:45 UTC (rev 11012)
@@ -13,6 +13,7 @@
package org.hornetq.tests.integration.client;
+import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@@ -497,6 +498,248 @@
}
+ /**
+ * This test will remove all the page directories during a restart, simulating a crash scenario. The server should still start after this
+ */
+ public void testDeletePhisicalPages() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+ config.setPersistDeliveryCountBeforeDelivery(true);
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 1000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ session.close();
+
+ session = null;
+
+ sf.close();
+ locator.close();
+
+ server.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+
+ Queue queue = server.locateQueue(ADDRESS);
+
+ assertEquals(numberOfMessages, queue.getMessageCount());
+
+ LinkedList<Xid> xids = new LinkedList<Xid>();
+
+ int msgReceived = 0;
+ ClientSession sessionConsumer = sf.createSession(false, false, false);
+ sessionConsumer.start();
+ ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
+ for (int msgCount = 0; msgCount < numberOfMessages; msgCount++)
+ {
+ log.info("Received " + msgCount);
+ msgReceived++;
+ ClientMessage msg = consumer.receiveImmediate();
+ if (msg == null)
+ {
+ log.info("It's null. leaving now");
+ sessionConsumer.commit();
+ fail("Didn't receive a message");
+ }
+ msg.acknowledge();
+
+ if (msgCount % 5 == 0)
+ {
+ log.info("commit");
+ sessionConsumer.commit();
+ }
+ }
+
+ sessionConsumer.commit();
+
+ sessionConsumer.close();
+
+ sf.close();
+
+ locator.close();
+
+ assertEquals(0, queue.getMessageCount());
+
+ long timeout = System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
+ {
+ Thread.sleep(100);
+ }
+ assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
+
+ server.stop();
+
+ // Deleting the paging data. Simulating a failure
+ // a dumb user, or anything that will remove the data
+ deleteDirectory(new File(getPageDir()));
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+
+ locator = createInVMNonHALocator();
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ sf = locator.createSessionFactory();
+
+ queue = server.locateQueue(ADDRESS);
+
+ sf = locator.createSessionFactory();
+ session = sf.createSession(false, false, false);
+
+ producer = session.createProducer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ server.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+
+ queue = server.locateQueue(ADDRESS);
+
+ // assertEquals(numberOfMessages, queue.getMessageCount());
+
+ xids = new LinkedList<Xid>();
+
+ msgReceived = 0;
+ sessionConsumer = sf.createSession(false, false, false);
+ sessionConsumer.start();
+ consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
+ for (int msgCount = 0; msgCount < numberOfMessages; msgCount++)
+ {
+ log.info("Received " + msgCount);
+ msgReceived++;
+ ClientMessage msg = consumer.receiveImmediate();
+ if (msg == null)
+ {
+ log.info("It's null. leaving now");
+ sessionConsumer.commit();
+ fail("Didn't receive a message");
+ }
+ msg.acknowledge();
+
+ if (msgCount % 5 == 0)
+ {
+ log.info("commit");
+ sessionConsumer.commit();
+ }
+ }
+
+ sessionConsumer.commit();
+
+ sessionConsumer.close();
+
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testMissingTXEverythingAcked() throws Exception
{
clearData();
@@ -1281,7 +1524,7 @@
sf.close();
locator.close();
}
-
+
server = createServer(true,
config,
PagingTest.PAGE_SIZE,
@@ -1353,7 +1596,6 @@
t.start();
t.join();
-
assertEquals(0, errors.get());
for (int i = 0; i < 20 && server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging(); i++)
@@ -1361,9 +1603,8 @@
// The delete may be asynchronous, giving some time case it eventually happen asynchronously
Thread.sleep(500);
}
-
- assertFalse (server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging());
+ assertFalse(server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging());
for (int i = 0; i < 20 && server.getPostOffice().getPagingManager().getTransactions().size() != 0; i++)
{
@@ -1372,7 +1613,6 @@
}
assertEquals(0, server.getPostOffice().getPagingManager().getTransactions().size());
-
}
finally
13 years, 5 months
JBoss hornetq SVN: r11011 - branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/config/common.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-20 21:04:51 -0400 (Wed, 20 Jul 2011)
New Revision: 11011
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/config/common/hornetq-version.properties
Log:
version
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/config/common/hornetq-version.properties
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/config/common/hornetq-version.properties 2011-07-21 01:04:03 UTC (rev 11010)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/config/common/hornetq-version.properties 2011-07-21 01:04:51 UTC (rev 11011)
@@ -1,4 +1,4 @@
-hornetq.version.versionName=HQ_2_2_5_GA_EAP
+hornetq.version.versionName=HQ_2_2_5_GA_EAP_JBPAPP-6871
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
hornetq.version.microVersion=5
13 years, 5 months
JBoss hornetq SVN: r11010 - in branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871: src/main/org/hornetq/core/server/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-20 21:04:03 -0400 (Wed, 20 Jul 2011)
New Revision: 11010
Added:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
JBPAPP-6871, HORNETQ-725, HORNETQ-744, HORNETQ-743 - a few paging fixes
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-07-21 00:57:58 UTC (rev 11009)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-07-21 01:04:03 UTC (rev 11010)
@@ -89,6 +89,10 @@
public synchronized PageSubscription createSubscription(long cursorID, Filter filter, boolean persistent)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this.pagingStore.getAddress() + " creating subscription " + cursorID + " with filter " + filter, new Exception ("trace"));
+ }
PageSubscription activeCursor = activeCursors.get(cursorID);
if (activeCursor != null)
{
@@ -330,6 +334,11 @@
{
return;
}
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Asserting cleanup for address " + this.pagingStore.getAddress());
+ }
ArrayList<PageSubscription> cursorList = new ArrayList<PageSubscription>();
cursorList.addAll(activeCursors.values());
@@ -344,9 +353,21 @@
{
if (!cursor.isComplete(minPage))
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Cursor " + cursor + " was considered incomplete at page " + minPage);
+ }
+
complete = false;
break;
}
+ else
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("Cursor " + cursor + "was considered **complete** at page " + minPage);
+ }
+ }
}
if (complete)
@@ -516,12 +537,21 @@
for (PageSubscription cursor : cursorList)
{
long firstPage = cursor.getFirstPage();
+ if (log.isDebugEnabled())
+ {
+ log.debug(this.pagingStore.getAddress() + " has a cursor " + cursor + " with first page=" + firstPage);
+ }
if (firstPage < minPage)
{
minPage = firstPage;
}
}
+ if (log.isDebugEnabled())
+ {
+ log.debug(this.pagingStore.getAddress() + " has minPage=" + minPage);
+ }
+
return minPage;
}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-07-21 00:57:58 UTC (rev 11009)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-07-21 01:04:03 UTC (rev 11010)
@@ -308,7 +308,7 @@
@Override
public String toString()
{
- return "PageSubscriptionImpl [cursorId=" + cursorId + ", queue=" + queue + "]";
+ return "PageSubscriptionImpl [cursorId=" + cursorId + ", queue=" + queue + ", filter = " + filter + "]";
}
@@ -648,22 +648,42 @@
Collections.sort(recoveredACK);
boolean first = true;
+
+ long txDeleteCursorOnReload = -1;
for (PagePosition pos : recoveredACK)
{
lastAckedPosition = pos;
- PageCursorInfo positions = getPageInfo(pos);
- if (first)
+ PageCursorInfo pageInfo = getPageInfo(pos);
+
+ if (pageInfo == null)
{
- first = false;
- if (pos.getMessageNr() > 0)
+ log.warn("Couldn't find page cache for page " + pos + ", removing it from the journal");
+ if (txDeleteCursorOnReload == -1)
{
- positions.confirmed.addAndGet(pos.getMessageNr());
+ txDeleteCursorOnReload = store.generateUniqueID();
}
+ store.deleteCursorAcknowledgeTransactional(txDeleteCursorOnReload, pos.getRecordID());
+ }
+ else
+ {
+ if (first)
+ {
+ first = false;
+ if (pos.getMessageNr() > 0)
+ {
+ pageInfo.confirmed.addAndGet(pos.getMessageNr());
+ }
+ }
+
+ pageInfo.addACK(pos);
}
-
- positions.addACK(pos);
}
+
+ if (txDeleteCursorOnReload >= 0)
+ {
+ store.commit(txDeleteCursorOnReload);
+ }
recoveredACK.clear();
recoveredACK = null;
@@ -723,6 +743,10 @@
if (create && pageInfo == null)
{
PageCache cache = cursorProvider.getPageCache(pos);
+ if (cache == null)
+ {
+ return null;
+ }
pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages(), cache);
consumedPages.put(pos.getPageNr(), pageInfo);
}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-21 00:57:58 UTC (rev 11009)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-21 01:04:03 UTC (rev 11010)
@@ -74,6 +74,7 @@
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.postoffice.QueueBinding;
import org.hornetq.core.postoffice.impl.DivertBinding;
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
@@ -1580,28 +1581,31 @@
{
queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo);
- Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
-
- PageSubscription subscription = pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createSubscription(queueBindingInfo.getId(), filter, true);
+ if (queueBindingInfo.getFilterString() == null || !queueBindingInfo.getFilterString().toString().equals(GENERIC_IGNORED_FILTER))
+ {
+ Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
+
+ PageSubscription subscription = pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createSubscription(queueBindingInfo.getId(), filter, true);
+
+ Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
+ queueBindingInfo.getAddress(),
+ queueBindingInfo.getQueueName(),
+ filter,
+ subscription,
+ true,
+ false);
+
+ Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId());
+
+ queues.put(queueBindingInfo.getId(), queue);
+
+ postOffice.addBinding(binding);
+
+ managementService.registerAddress(queueBindingInfo.getAddress());
+ managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
+ }
- Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
- queueBindingInfo.getAddress(),
- queueBindingInfo.getQueueName(),
- filter,
- subscription,
- true,
- false);
-
- Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId());
-
- queues.put(queueBindingInfo.getId(), queue);
-
- postOffice.addBinding(binding);
-
- managementService.registerAddress(queueBindingInfo.getAddress());
- managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
-
}
for (GroupingInfo groupingInfo : groupingInfos)
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-07-21 00:57:58 UTC (rev 11009)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-07-21 01:04:03 UTC (rev 11010)
@@ -260,7 +260,7 @@
// If updateDeliveries = false (set by strict-update),
// the updateDeliveryCount would still be updated after c
- if (strictUpdateDeliveryCount)
+ if (strictUpdateDeliveryCount && !ref.isPaged())
{
if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && !ref.getQueue().isInternalQueue())
{
Added: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java (rev 0)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java 2011-07-21 01:04:03 UTC (rev 11010)
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.JMSTestBase;
+
+public class JMSPagingFileDeleteTest extends JMSTestBase
+{
+ static Logger log = Logger.getLogger(JMSPagingFileDeleteTest.class);
+
+ Topic topic1;
+
+ Connection connection;
+
+ Session session;
+
+ MessageConsumer subscriber1;
+
+ MessageConsumer subscriber2;
+
+ PagingStore pagingStore;
+
+ private static final int MESSAGE_SIZE = 1024;
+
+ private static final int PAGE_SIZE = 10 * 1024;
+
+ private static final int PAGE_MAX = 20 * 1024;
+
+ private static final int RECEIVE_TIMEOUT = 500;
+
+ private static final int MESSAGE_NUM = 50;
+
+ @Override
+ protected boolean usePersistence()
+ {
+ return true;
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ clearData();
+ super.setUp();
+
+ topic1 = createTopic("topic1");
+
+ // Paging Setting
+ AddressSettings setting = new AddressSettings();
+ setting.setPageSizeBytes(JMSPagingFileDeleteTest.PAGE_SIZE);
+ setting.setMaxSizeBytes(JMSPagingFileDeleteTest.PAGE_MAX);
+ server.getAddressSettingsRepository().addMatch("#", setting);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ topic1 = null;
+ super.tearDown();
+ }
+
+ public void testTopics() throws Exception
+ {
+ connection = null;
+
+ try
+ {
+ connection = cf.createConnection();
+ connection.setClientID("cid");
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(topic1);
+ subscriber1 = session.createDurableSubscriber(topic1, "subscriber-1");
+ subscriber2 = session.createDurableSubscriber(topic1, "subscriber-2");
+
+ // -----------------(Step1) Publish Messages to make Paging Files. --------------------
+ System.out.println("---------- Send messages. ----------");
+ BytesMessage bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes(new byte[JMSPagingFileDeleteTest.MESSAGE_SIZE]);
+ for (int i = 0; i < JMSPagingFileDeleteTest.MESSAGE_NUM; i++)
+ {
+ producer.send(bytesMessage);
+ }
+ System.out.println("Sent " + JMSPagingFileDeleteTest.MESSAGE_NUM + " messages.");
+
+ pagingStore = server.getPagingManager().getPageStore(new SimpleString("jms.topic.topic1"));
+ printPageStoreInfo(pagingStore);
+
+ assertTrue(pagingStore.isPaging());
+
+ connection.start();
+
+ // -----------------(Step2) Restart the server. --------------------------------------
+ stopAndStartServer(); // If try this test without restarting server, please comment out this line;
+
+ // -----------------(Step3) Subscribe to all the messages from the topic.--------------
+ System.out.println("---------- Receive all messages. ----------");
+ for (int i = 0; i < JMSPagingFileDeleteTest.MESSAGE_NUM; i++)
+ {
+ Message message1 = subscriber1.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT);
+ assertNotNull(message1);
+ Message message2 = subscriber2.receive(JMSPagingFileDeleteTest.RECEIVE_TIMEOUT);
+ assertNotNull(message2);
+ }
+
+ pagingStore = server.getPagingManager().getPageStore(new SimpleString("jms.topic.topic1"));
+ long timeout = System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() && pagingStore.isPaging())
+ {
+ Thread.sleep(100);
+ }
+ assertFalse(pagingStore.isPaging());
+
+ printPageStoreInfo(pagingStore);
+
+ assertEquals(0, pagingStore.getAddressSize());
+ // assertEquals(1, pagingStore.getNumberOfPages()); //I expected number of the page is 1, but It was not.
+ assertFalse(pagingStore.isPaging()); // I expected IsPaging is false, but It was true.
+ // If the server is not restart, this test pass.
+
+ // -----------------(Step4) Publish a message. the message is stored in the paging file.
+ producer = session.createProducer(topic1);
+ bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes(new byte[JMSPagingFileDeleteTest.MESSAGE_SIZE]);
+ producer.send(bytesMessage);
+
+ printPageStoreInfo(pagingStore);
+
+ assertEquals(1, pagingStore.getNumberOfPages()); //I expected number of the page is 1, but It was not.
+ }
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ private void stopAndStartServer() throws Exception
+ {
+ System.out.println("---------- Restart server. ----------");
+ connection.close();
+
+ jmsServer.stop();
+
+ jmsServer.start();
+ jmsServer.activated();
+ registerConnectionFactory();
+
+ printPageStoreInfo(pagingStore);
+ reconnect();
+ }
+
+ private void reconnect() throws Exception
+ {
+ connection = cf.createConnection();
+ connection.setClientID("cid");
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ subscriber1 = session.createDurableSubscriber(topic1, "subscriber-1");
+ subscriber2 = session.createDurableSubscriber(topic1, "subscriber-2");
+ connection.start();
+ }
+
+ private void printPageStoreInfo(PagingStore pagingStore) throws Exception
+ {
+ System.out.println("---------- Paging Store Info ----------");
+ System.out.println(" CurrentPage = " + pagingStore.getCurrentPage());
+ System.out.println(" FirstPage = " + pagingStore.getFirstPage());
+ System.out.println(" Number of Pages = " + pagingStore.getNumberOfPages());
+ System.out.println(" Address Size = " + pagingStore.getAddressSize());
+ System.out.println(" Is Paging = " + pagingStore.isPaging());
+ }
+}
\ No newline at end of file
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-07-21 00:57:58 UTC (rev 11009)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-07-21 01:04:03 UTC (rev 11010)
@@ -13,6 +13,7 @@
package org.hornetq.tests.integration.client;
+import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@@ -497,6 +498,248 @@
}
+ /**
+ * This test will remove all the page directories during a restart, simulating a crash scenario. The server should still start after this
+ */
+ public void testDeletePhisicalPages() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+ config.setPersistDeliveryCountBeforeDelivery(true);
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 1000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ session.close();
+
+ session = null;
+
+ sf.close();
+ locator.close();
+
+ server.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+
+ Queue queue = server.locateQueue(ADDRESS);
+
+ assertEquals(numberOfMessages, queue.getMessageCount());
+
+ LinkedList<Xid> xids = new LinkedList<Xid>();
+
+ int msgReceived = 0;
+ ClientSession sessionConsumer = sf.createSession(false, false, false);
+ sessionConsumer.start();
+ ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
+ for (int msgCount = 0; msgCount < numberOfMessages; msgCount++)
+ {
+ log.info("Received " + msgCount);
+ msgReceived++;
+ ClientMessage msg = consumer.receiveImmediate();
+ if (msg == null)
+ {
+ log.info("It's null. leaving now");
+ sessionConsumer.commit();
+ fail("Didn't receive a message");
+ }
+ msg.acknowledge();
+
+ if (msgCount % 5 == 0)
+ {
+ log.info("commit");
+ sessionConsumer.commit();
+ }
+ }
+
+ sessionConsumer.commit();
+
+ sessionConsumer.close();
+
+ sf.close();
+
+ locator.close();
+
+ assertEquals(0, queue.getMessageCount());
+
+ long timeout = System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
+ {
+ Thread.sleep(100);
+ }
+ assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
+
+ server.stop();
+
+ // Deleting the paging data. Simulating a failure
+ // a dumb user, or anything that will remove the data
+ deleteDirectory(new File(getPageDir()));
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+
+ locator = createInVMNonHALocator();
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ sf = locator.createSessionFactory();
+
+ queue = server.locateQueue(ADDRESS);
+
+ sf = locator.createSessionFactory();
+ session = sf.createSession(false, false, false);
+
+ producer = session.createProducer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ server.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+
+ queue = server.locateQueue(ADDRESS);
+
+ // assertEquals(numberOfMessages, queue.getMessageCount());
+
+ xids = new LinkedList<Xid>();
+
+ msgReceived = 0;
+ sessionConsumer = sf.createSession(false, false, false);
+ sessionConsumer.start();
+ consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
+ for (int msgCount = 0; msgCount < numberOfMessages; msgCount++)
+ {
+ log.info("Received " + msgCount);
+ msgReceived++;
+ ClientMessage msg = consumer.receiveImmediate();
+ if (msg == null)
+ {
+ log.info("It's null. leaving now");
+ sessionConsumer.commit();
+ fail("Didn't receive a message");
+ }
+ msg.acknowledge();
+
+ if (msgCount % 5 == 0)
+ {
+ log.info("commit");
+ sessionConsumer.commit();
+ }
+ }
+
+ sessionConsumer.commit();
+
+ sessionConsumer.close();
+
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testMissingTXEverythingAcked() throws Exception
{
clearData();
@@ -1281,7 +1524,7 @@
sf.close();
locator.close();
}
-
+
server = createServer(true,
config,
PagingTest.PAGE_SIZE,
@@ -1353,7 +1596,6 @@
t.start();
t.join();
-
assertEquals(0, errors.get());
for (int i = 0; i < 20 && server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging(); i++)
@@ -1361,9 +1603,8 @@
// The delete may be asynchronous, giving some time case it eventually happen asynchronously
Thread.sleep(500);
}
-
- assertFalse (server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging());
+ assertFalse(server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging());
for (int i = 0; i < 20 && server.getPostOffice().getPagingManager().getTransactions().size() != 0; i++)
{
@@ -1372,7 +1613,6 @@
}
assertEquals(0, server.getPostOffice().getPagingManager().getTransactions().size());
-
}
finally
13 years, 5 months
JBoss hornetq SVN: r11009 - branches/one-offs.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-20 20:57:58 -0400 (Wed, 20 Jul 2011)
New Revision: 11009
Added:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_6871/
Log:
One off for JBPAPP-6871
13 years, 5 months
JBoss hornetq SVN: r11008 - tags.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-20 12:14:57 -0400 (Wed, 20 Jul 2011)
New Revision: 11008
Added:
tags/HornetQ_2_2_6_Final/
Removed:
tags/HornetQ_2_2_6_Final_pending/
Log:
Renaming file
13 years, 5 months
JBoss hornetq SVN: r11007 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-20 02:30:24 -0400 (Wed, 20 Jul 2011)
New Revision: 11007
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
Fixing OneWayClusterTest
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-20 06:02:09 UTC (rev 11006)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-20 06:30:24 UTC (rev 11007)
@@ -45,6 +45,7 @@
import org.hornetq.core.cluster.DiscoveryListener;
import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.FailureListener;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.UUIDGenerator;
@@ -156,6 +157,8 @@
private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
private static ExecutorService globalThreadPool;
+
+ private static Executor startExecutor;
private static ScheduledExecutorService globalScheduledThreadPool;
@@ -168,14 +171,14 @@
private boolean backup;
private final Exception e = new Exception();
-
+
// To be called when there are ServerLocator being finalized.
// To be used on test assertions
public static Runnable finalizeCallback = null;
-
+
public static synchronized void clearThreadPools()
{
-
+
if (globalThreadPool != null)
{
globalThreadPool.shutdown();
@@ -194,7 +197,7 @@
globalThreadPool = null;
}
}
-
+
if (globalScheduledThreadPool != null)
{
globalScheduledThreadPool.shutdown();
@@ -471,6 +474,8 @@
public void start(Executor executor) throws Exception
{
initialise();
+
+ this.startExecutor = executor;
executor.execute(new Runnable()
{
@@ -1106,7 +1111,7 @@
{
staticConnector.disconnect();
}
-
+
Set<ClientSessionFactory> clonedFactory = new HashSet<ClientSessionFactory>(factories);
for (ClientSessionFactory factory : clonedFactory)
@@ -1231,7 +1236,7 @@
// Notify if waiting on getting topology
notify();
}
-
+
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@@ -1297,7 +1302,7 @@
public void addClusterTopologyListener(final ClusterTopologyListener listener)
{
topologyListeners.add(listener);
- if(topology.members() > 0)
+ if (topology.members() > 0)
{
log.debug("ServerLocatorImpl.addClusterTopologyListener");
}
@@ -1360,37 +1365,57 @@
try
{
-
+
int retryNumber = 0;
while (csf == null && !ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing)
{
- retryNumber ++;
+ retryNumber++;
for (Connector conn : connectors)
{
- if (log.isDebugEnabled())
- {
- log.debug("Submitting connect towards " + conn);
- }
-
- csf = conn.tryConnect();
-
- if (csf != null)
- {
- return csf;
- }
+ if (log.isDebugEnabled())
+ {
+ log.debug("Submitting connect towards " + conn);
+ }
+
+ csf = conn.tryConnect();
+
+ if (csf != null)
+ {
+ csf.getConnection().addFailureListener(new FailureListener()
+ {
+ // Case the node where we were connected is gone, we need to restart the connection
+ public void connectionFailed(HornetQException exception, boolean failedOver)
+ {
+ if (exception.getCode() == HornetQException.DISCONNECTED)
+ {
+ try
+ {
+ ServerLocatorImpl.this.start(startExecutor);
+ }
+ catch (Exception e)
+ {
+ // There isn't much to be done if this happens here
+ log.warn(e.getMessage());
+ }
+ }
+ }
+ });
+
+ return csf;
+ }
}
-
- if (initialConnectAttempts >=0 && retryNumber > initialConnectAttempts)
+
+ if (initialConnectAttempts >= 0 && retryNumber > initialConnectAttempts)
{
break;
}
-
+
if (!closed && !closing)
{
- Thread.sleep (retryInterval);
+ Thread.sleep(retryInterval);
}
}
-
+
}
catch (Exception e)
{
@@ -1444,7 +1469,7 @@
System.identityHashCode(this));
log.warn("The ServerLocator you didn't close was created here:", e);
-
+
if (ServerLocatorImpl.finalizeCallback != null)
{
ServerLocatorImpl.finalizeCallback.run();
@@ -1456,7 +1481,7 @@
super.finalize();
}
- class Connector
+ class Connector
{
private TransportConfiguration initialConnector;
@@ -1510,9 +1535,7 @@
{
return "Connector [initialConnector=" + initialConnector + "]";
}
-
-
-
+
}
}
}
13 years, 5 months