[hornetq-commits] JBoss hornetq SVN: r10153 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging and 6 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Jan 27 21:07:00 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-01-27 21:07:00 -0500 (Thu, 27 Jan 2011)
New Revision: 10153
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
JBPAPP-5523 - Paging settings
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -114,6 +114,7 @@
final NotificationBroadcasterSupport broadcaster) throws Exception
{
super(HornetQServerControl.class, storageManager);
+ new Exception(".....").printStackTrace();
this.postOffice = postOffice;
this.configuration = configuration;
this.resourceManager = resourceManager;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingManager.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingManager.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -19,6 +19,7 @@
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.settings.HierarchicalRepositoryChangeListener;
/**
*
@@ -44,7 +45,7 @@
* @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
*
*/
-public interface PagingManager extends HornetQComponent
+public interface PagingManager extends HornetQComponent, HierarchicalRepositoryChangeListener
{
/** To return the PageStore associated with the address */
PagingStore getPageStore(SimpleString address) throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingStore.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingStore.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -20,6 +20,7 @@
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
/**
*
@@ -54,6 +55,8 @@
long getAddressSize();
long getMaxSize();
+
+ void applySetting(AddressSettings addressSettings);
boolean isPaging();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -69,15 +69,38 @@
{
pagingStoreFactory = pagingSPI;
this.addressSettingsRepository = addressSettingsRepository;
+ addressSettingsRepository.registerListener(this);
this.storageManager = storageManager;
}
// Public
// ---------------------------------------------------------------------------------------------------------------------------
+
+ // Hierarchical changes listener
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
+ */
+ public void onChange()
+ {
+ reaplySettings();
+ }
+
+
+
// PagingManager implementation
// -----------------------------------------------------------------------------------------------------
+ public void reaplySettings()
+ {
+ for (PagingStore store : stores.values())
+ {
+ AddressSettings settings = this.addressSettingsRepository.getMatch(store.getAddress().toString());
+ store.applySetting(settings);
+ }
+ }
+
public SimpleString[] getStoreNames()
{
Set<SimpleString> names = stores.keySet();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -22,7 +22,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
@@ -45,7 +44,6 @@
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
-import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.RouteContextList;
@@ -97,11 +95,11 @@
// Used to schedule sync threads
private final PageSyncTimer syncTimer;
- private final long maxSize;
+ private long maxSize;
- private final long pageSize;
+ private long pageSize;
- private final AddressFullMessagePolicy addressFullMessagePolicy;
+ private AddressFullMessagePolicy addressFullMessagePolicy;
private boolean printedDropMessagesWarning;
@@ -112,8 +110,6 @@
// Bytes consumed by the queue on the memory
private final AtomicLong sizeInBytes = new AtomicLong();
- private final AtomicBoolean depaging = new AtomicBoolean(false);
-
private volatile int numberOfPages;
private volatile int firstPageId;
@@ -124,9 +120,6 @@
private volatile boolean paging = false;
- /** duplicate cache used at this address */
- private final DuplicateIDCache duplicateCache;
-
private final PageCursorProvider cursorProvider;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
@@ -162,6 +155,7 @@
final ExecutorFactory executorFactory,
final boolean syncNonTransactional)
{
+ new Exception("new pageStore for " + address).printStackTrace();
if (pagingManager == null)
{
throw new IllegalStateException("Paging Manager can't be null");
@@ -175,12 +169,8 @@
this.storeName = storeName;
- maxSize = addressSettings.getMaxSizeBytes();
+ applySetting(addressSettings);
- pageSize = addressSettings.getPageSizeBytes();
-
- addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
-
if (addressFullMessagePolicy == AddressFullMessagePolicy.PAGE && maxSize != -1 && pageSize >= maxSize)
{
throw new IllegalStateException("pageSize for address " + address +
@@ -212,16 +202,18 @@
this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executorFactory);
- // Post office could be null on the backup node
- if (postOffice == null)
- {
- this.duplicateCache = null;
- }
- else
- {
- this.duplicateCache = postOffice.getDuplicateIDCache(storeName);
- }
+ }
+ /**
+ * @param addressSettings
+ */
+ public void applySetting(final AddressSettings addressSettings)
+ {
+ maxSize = addressSettings.getMaxSizeBytes();
+
+ pageSize = addressSettings.getPageSizeBytes();
+
+ addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
}
// Public --------------------------------------------------------
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -17,17 +17,29 @@
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -36,7 +48,11 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.tests.unit.util.InVMContext;
import org.hornetq.tests.util.ServiceTestBase;
/**
@@ -307,7 +323,7 @@
assertEquals(i, msg.getIntProperty("id").intValue());
msg.acknowledge();
}
-
+
assertNull(cons.receiveImmediate());
sess.close();
sl.close();
@@ -348,22 +364,20 @@
assertEquals(numberOfMessages, q2.getMessageCount());
assertEquals(0, q1.getMessageCount());
-
-
+
session.close();
sf.close();
locator.close();
-
+
server.stop();
-
-
+
server.start();
-
+
Bindings bindings = server.getPostOffice().getBindingsForAddress(ADDRESS);
-
+
q1 = null;
q2 = null;
-
+
for (Binding bind : bindings.getBindings())
{
if (bind instanceof LocalQueueBinding)
@@ -373,25 +387,20 @@
{
q1 = qb.getQueue();
}
-
+
if (qb.getQueue().getName().equals(new SimpleString("inactive")))
{
q2 = qb.getQueue();
}
}
}
-
+
assertNotNull(q1);
-
+
assertNotNull(q2);
-
assertEquals(numberOfMessages, q2.getMessageCount());
assertEquals(0, q1.getMessageCount());
-
-
-
-
}
catch (Throwable e)
@@ -510,7 +519,7 @@
}
session.commit();
-
+
q1.getMessageCount();
t1.start();
@@ -518,10 +527,10 @@
assertEquals(0, errors.get());
long timeout = System.currentTimeMillis() + 10000;
- while (numberOfMessages -100 != q1.getMessageCount() && System.currentTimeMillis() < timeout)
+ while (numberOfMessages - 100 != q1.getMessageCount() && System.currentTimeMillis() < timeout)
{
Thread.sleep(500);
-
+
}
assertEquals(numberOfMessages, q2.getMessageCount());
@@ -833,8 +842,154 @@
{
}
}
+ }
+ public void testPagingOverCreatedDestinationTopics() throws Exception
+ {
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true, config, PAGE_SIZE, -1, new HashMap<String, AddressSettings>());
+
+ JMSServerManagerImpl jmsServer = new JMSServerManagerImpl(server);
+ InVMContext context = new InVMContext();
+ jmsServer.setContext(context);
+ jmsServer.start();
+
+ jmsServer.createTopic(true, "tt", "/topic/TT");
+
+ server.getHornetQServerControl().addAddressSettings("jms.topic.TT",
+ "DLQ",
+ "DLQ",
+ false,
+ 5,
+ 1024 * 1024,
+ 1024 * 10,
+ 5,
+ 0,
+ false,
+ "PAGE");
+
+ HornetQJMSConnectionFactory cf = (HornetQJMSConnectionFactory)HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
+ new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+ Connection conn = cf.createConnection();
+ conn.setClientID("tst");
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = (Topic)context.lookup("/topic/TT");
+ sess.createDurableSubscriber(topic, "t1");
+
+ MessageProducer prod = sess.createProducer(topic);
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ TextMessage txt = sess.createTextMessage("TST");
+ prod.send(txt);
+
+ PagingStore store = server.getPagingManager().getPageStore(new SimpleString("jms.topic.TT"));
+
+ assertEquals(1024 * 1024, store.getMaxSize());
+ assertEquals(10 * 1024, store.getPageSizeBytes());
+
+ jmsServer.stop();
+
+ server = createServer(true, config, PAGE_SIZE, -1, new HashMap<String, AddressSettings>());
+
+ jmsServer = new JMSServerManagerImpl(server);
+ context = new InVMContext();
+ jmsServer.setContext(context);
+ jmsServer.start();
+
+ AddressSettings settings = server.getAddressSettingsRepository().getMatch("jms.topic.TT");
+
+ assertEquals(1024 * 1024, settings.getMaxSizeBytes());
+ assertEquals(10 * 1024, settings.getPageSizeBytes());
+ assertEquals(AddressFullMessagePolicy.PAGE, settings.getAddressFullMessagePolicy());
+
+ store = server.getPagingManager().getPageStore(new SimpleString("TT"));
+
+ server.stop();
+
}
+
+ public void testPagingOverCreatedDestinationQueues() throws Exception
+ {
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true, config, -1, -1, AddressFullMessagePolicy.BLOCK, new HashMap<String, AddressSettings>());
+
+ JMSServerManagerImpl jmsServer = new JMSServerManagerImpl(server);
+ InVMContext context = new InVMContext();
+ jmsServer.setContext(context);
+ jmsServer.start();
+
+ server.getHornetQServerControl().addAddressSettings("jms.queue.Q1",
+ "DLQ",
+ "DLQ",
+ false,
+ 5,
+ 100 * 1024,
+ 10 * 1024,
+ 5,
+ 0,
+ false,
+ "PAGE");
+
+ jmsServer.createQueue(true, "Q1", null, true, "/queue/Q1");
+
+ HornetQJMSConnectionFactory cf = (HornetQJMSConnectionFactory)HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
+ new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+ Connection conn = cf.createConnection();
+ conn.setClientID("tst");
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Queue queue = (javax.jms.Queue)context.lookup("/queue/Q1");
+
+ MessageProducer prod = sess.createProducer(queue);
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ BytesMessage bmt = sess.createBytesMessage();
+
+ bmt.writeBytes(new byte[1024]);
+
+ for (int i = 0 ; i < 500; i++)
+ {
+ prod.send(bmt);
+ }
+
+ PagingStore store = server.getPagingManager().getPageStore(new SimpleString("jms.queue.Q1"));
+
+ assertEquals(100 * 1024, store.getMaxSize());
+ assertEquals(10 * 1024, store.getPageSizeBytes());
+ assertEquals(AddressFullMessagePolicy.PAGE, store.getAddressFullMessagePolicy());
+
+ jmsServer.stop();
+
+ server = createServer(true, config, -1, -1, AddressFullMessagePolicy.BLOCK, new HashMap<String, AddressSettings>());
+
+ jmsServer = new JMSServerManagerImpl(server);
+ context = new InVMContext();
+ jmsServer.setContext(context);
+ jmsServer.start();
+
+ AddressSettings settings = server.getAddressSettingsRepository().getMatch("jms.queue.Q1");
+
+ assertEquals(100 * 1024, settings.getMaxSizeBytes());
+ assertEquals(10 * 1024, settings.getPageSizeBytes());
+ assertEquals(AddressFullMessagePolicy.PAGE, settings.getAddressFullMessagePolicy());
+
+ store = server.getPagingManager().getPageStore(new SimpleString("jms.queue.Q1"));
+ assertEquals(100 * 1024, store.getMaxSize());
+ assertEquals(10 * 1024, store.getPageSizeBytes());
+ assertEquals(AddressFullMessagePolicy.PAGE, store.getAddressFullMessagePolicy());
+
+
+ server.stop();
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -24,15 +24,12 @@
import junit.framework.Assert;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.ClusterConnectionControl;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.CoreQueueConfiguration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -1110,6 +1110,13 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
+ */
+ public void onChange()
+ {
+ }
+
}
class FakeStorageManager implements StorageManager
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -333,6 +333,13 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
+ */
+ public void onChange()
+ {
+ }
+
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -42,6 +42,7 @@
import org.hornetq.core.server.HornetQServers;
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.jms.client.HornetQBytesMessage;
import org.hornetq.jms.client.HornetQTextMessage;
@@ -203,6 +204,16 @@
final int maxAddressSize,
final Map<String, AddressSettings> settings)
{
+ return createServer(realFiles, configuration, pageSize, maxAddressSize, settings);
+ }
+
+ protected HornetQServer createServer(final boolean realFiles,
+ final Configuration configuration,
+ final int pageSize,
+ final int maxAddressSize,
+ final AddressFullMessagePolicy fullPolicy,
+ final Map<String, AddressSettings> settings)
+ {
HornetQServer server;
if (realFiles)
@@ -222,6 +233,7 @@
AddressSettings defaultSetting = new AddressSettings();
defaultSetting.setPageSizeBytes(pageSize);
defaultSetting.setMaxSizeBytes(maxAddressSize);
+ defaultSetting.setAddressFullMessagePolicy(fullPolicy);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
More information about the hornetq-commits
mailing list