JBoss hornetq SVN: r10051 - in trunk: tests/src/org/hornetq/tests/integration/jms/server/management and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-17 10:26:51 -0500 (Fri, 17 Dec 2010)
New Revision: 10051
Modified:
trunk/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
Log:
fixing tests
Modified: trunk/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/JMSQueueControl.java 2010-12-17 05:17:45 UTC (rev 10050)
+++ trunk/src/main/org/hornetq/api/jms/management/JMSQueueControl.java 2010-12-17 15:26:51 UTC (rev 10051)
@@ -13,7 +13,6 @@
package org.hornetq.api.jms.management;
-import java.util.List;
import java.util.Map;
import javax.management.MBeanOperationInfo;
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java 2010-12-17 05:17:45 UTC (rev 10050)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java 2010-12-17 15:26:51 UTC (rev 10051)
@@ -135,7 +135,7 @@
public long getMessageCount()
{
- return (Long)proxy.retrieveAttributeValue("messageCount");
+ return ((Number)proxy.retrieveAttributeValue("messageCount")).longValue();
}
public long getMessagesAdded()
14 years, 2 months
JBoss hornetq SVN: r10050 - trunk/src/main/org/hornetq/core/paging/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-17 00:17:45 -0500 (Fri, 17 Dec 2010)
New Revision: 10050
Modified:
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
Log:
more PageCounter changes
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-12-17 02:20:26 UTC (rev 10049)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-12-17 05:17:45 UTC (rev 10050)
@@ -50,6 +50,7 @@
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.Transaction.State;
+import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.utils.ExecutorFactory;
@@ -876,6 +877,10 @@
lock.readLock().unlock();
}
+ Transaction tx = ctx.getTransaction();
+
+ boolean startedTx = false;
+
lock.writeLock().lock();
try
@@ -884,6 +889,12 @@
{
return false;
}
+
+ if (tx == null)
+ {
+ tx = new TransactionImpl(storageManager);
+ startedTx = true;
+ }
PagedMessage pagedMessage;
@@ -893,8 +904,6 @@
// This will force everything to be persisted
message.bodyChanged();
}
-
- Transaction tx = ctx.getTransaction();
pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), getTransactionID(tx, listCtx));
@@ -913,6 +922,10 @@
finally
{
lock.writeLock().unlock();
+ if (startedTx)
+ {
+ tx.commit();
+ }
}
}
14 years, 2 months
JBoss hornetq SVN: r10049 - in trunk: src/main/org/hornetq/api/core/client and 19 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-16 21:20:26 -0500 (Thu, 16 Dec 2010)
New Revision: 10049
Modified:
trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
trunk/src/main/org/hornetq/api/core/client/ClientSession.java
trunk/src/main/org/hornetq/api/core/management/QueueControl.java
trunk/src/main/org/hornetq/api/jms/management/DestinationControl.java
trunk/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/core/messagecounter/MessageCounter.java
trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
trunk/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/QueueQueryResult.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/HornetQServerTestCase.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/Server.java
trunk/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
Log:
more PageCounter changes
Modified: trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
===================================================================
--- trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -49,9 +49,9 @@
// This may take some considerable time to create, send and consume - if it takes too long or you don't have
// enough disk space just reduce the file size here
- // private final long FILE_SIZE = 256L * 1024 * 1024;
+ private final long FILE_SIZE = 256L * 1024 * 1024;
- private final long FILE_SIZE = 10L * 1024 * 1024 * 1024; // 10 GiB message
+ //private final long FILE_SIZE = 10L * 1024 * 1024 * 1024; // 10 GiB message
@Override
public boolean runExample() throws Exception
Modified: trunk/src/main/org/hornetq/api/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ClientSession.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/src/main/org/hornetq/api/core/client/ClientSession.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -74,7 +74,7 @@
/**
* Returns the number of messages in the queue.
*/
- int getMessageCount();
+ long getMessageCount();
/**
* Returns the queue's filter string (or <code>null</code> if the queue has no filter).
Modified: trunk/src/main/org/hornetq/api/core/management/QueueControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/management/QueueControl.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/src/main/org/hornetq/api/core/management/QueueControl.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -60,7 +60,7 @@
/**
* Returns the number of messages currently in this queue.
*/
- int getMessageCount();
+ long getMessageCount();
/**
* Returns the number of scheduled messages in this queue.
@@ -142,7 +142,7 @@
* Using {@code null} or an empty filter will count <em>all</em> messages from this queue.
*/
@Operation(desc = "Returns the number of the messages in the queue matching the given filter", impact = MBeanOperationInfo.INFO)
- int countMessages(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter) throws Exception;
+ long countMessages(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter) throws Exception;
/**
* Removes the message corresponding to the specified message ID.
Modified: trunk/src/main/org/hornetq/api/jms/management/DestinationControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/DestinationControl.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/src/main/org/hornetq/api/jms/management/DestinationControl.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -45,7 +45,7 @@
/**
* Returns the number of messages currently in this destination.
*/
- int getMessageCount() throws Exception;
+ long getMessageCount() throws Exception;
/**
* Returns the number of messages that this queue is currently delivering to its consumers.
Modified: trunk/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/JMSQueueControl.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/src/main/org/hornetq/api/jms/management/JMSQueueControl.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -103,7 +103,7 @@
* Using {@code null} or an empty filter will count <em>all</em> messages from this queue.
*/
@Operation(desc = "Returns the number of the messages in the queue matching the given filter", impact = MBeanOperationInfo.INFO)
- int countMessages(@Parameter(name = "filter", desc = "A JMS message filter (can be empty)") String filter) throws Exception;
+ long countMessages(@Parameter(name = "filter", desc = "A JMS message filter (can be empty)") String filter) throws Exception;
/**
* Removes the message corresponding to the specified message ID.
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -1768,7 +1768,7 @@
private final boolean durable;
- private final int messageCount;
+ private final long messageCount;
private final SimpleString filterString;
@@ -1778,7 +1778,7 @@
public QueueQueryImpl(final boolean durable,
final int consumerCount,
- final int messageCount,
+ final long messageCount,
final SimpleString filterString,
final SimpleString address,
final boolean exists)
@@ -1807,7 +1807,7 @@
return filterString;
}
- public int getMessageCount()
+ public long getMessageCount()
{
return messageCount;
}
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -167,7 +167,7 @@
}
}
- public int getMessageCount()
+ public long getMessageCount()
{
checkStarted();
@@ -434,7 +434,7 @@
}
}
- public int countMessages(final String filterStr) throws Exception
+ public long countMessages(final String filterStr) throws Exception
{
checkStarted();
Modified: trunk/src/main/org/hornetq/core/messagecounter/MessageCounter.java
===================================================================
--- trunk/src/main/org/hornetq/core/messagecounter/MessageCounter.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/src/main/org/hornetq/core/messagecounter/MessageCounter.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -66,7 +66,7 @@
private long countTotalLast;
- private int depthLast;
+ private long depthLast;
private long timeLastUpdate;
@@ -188,7 +188,7 @@
* Gets the current message count of pending messages
* within the destination waiting for dispatch
*/
- public int getMessageCount()
+ public long getMessageCount()
{
return destQueue.getMessageCount();
}
@@ -197,10 +197,10 @@
* Gets the message count delta of pending messages
* since last method call.
*/
- public int getMessageCountDelta()
+ public long getMessageCountDelta()
{
- int current = destQueue.getMessageCount();
- int delta = current - depthLast;
+ long current = destQueue.getMessageCount();
+ int delta = (int)(current - depthLast);
depthLast = current;
Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -61,12 +61,12 @@
void ack(PagedReference ref) throws Exception;
// for internal (cursor) classes
- void ack(PagePosition ref) throws Exception;
+ void confirmPosition(PagePosition ref) throws Exception;
void ackTx(Transaction tx, PagedReference position) throws Exception;
// for internal (cursor) classes
- void ackTx(Transaction tx, PagePosition position) throws Exception;
+ void confirmPosition(Transaction tx, PagePosition position) throws Exception;
/**
*
Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -282,7 +282,7 @@
// First step: Move every cursor to the next bookmarked page (that was just created)
for (PageSubscription cursor : cursorList)
{
- cursor.ack(new PagePositionImpl(currentPage.getPageId(), -1));
+ cursor.confirmPosition(new PagePositionImpl(currentPage.getPageId(), -1));
}
storageManager.waitOnOperations();
Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -168,7 +168,7 @@
cursorInfo.confirmed.addAndGet(position.getMessageNr());
}
- ack(position);
+ confirmPosition(position);
}
public PageSubscriptionCounter getCounter()
@@ -395,7 +395,7 @@
return new PagePositionImpl(pageStore.getFirstPage(), -1);
}
- public void ackTx(final Transaction tx, final PagePosition position) throws Exception
+ public void confirmPosition(final Transaction tx, final PagePosition position) throws Exception
{
// if the cursor is persistent
if (persistent)
@@ -408,7 +408,9 @@
public void ackTx(final Transaction tx, final PagedReference reference) throws Exception
{
- ackTx(tx, reference.getPosition());
+ confirmPosition(tx, reference.getPosition());
+
+ counter.increment(tx, -1);
PageTransactionInfo txInfo = getPageTransaction(reference);
if (txInfo != null)
@@ -422,15 +424,13 @@
*/
public void ack(final PagedReference reference) throws Exception
{
- ack(reference.getPosition());
- PageTransactionInfo txInfo = getPageTransaction(reference);
- if (txInfo != null)
- {
- txInfo.storeUpdate(this.store, pageStore.getPagingManager());
- }
+ // Need to do the ACK and counter atomically (inside a TX) or the counter could get out of sync
+ Transaction tx = new TransactionImpl(this.store);
+ ackTx(tx, reference);
+ tx.commit();
}
- public void ack(final PagePosition position) throws Exception
+ public void confirmPosition(final PagePosition position) throws Exception
{
// if we are dealing with a persistent cursor
if (persistent)
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -896,7 +896,7 @@
Transaction tx = ctx.getTransaction();
- pagedMessage = new PagedMessageImpl(message, getQueueIDs(listCtx), getTransactionID(tx, listCtx));
+ pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), getTransactionID(tx, listCtx));
int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
@@ -917,20 +917,22 @@
}
- private long[] getQueueIDs(RouteContextList ctx)
+ private long[] routeQueues(Transaction tx, RouteContextList ctx) throws Exception
{
List<org.hornetq.core.server.Queue> durableQueues = ctx.getDurableQueues();
List<org.hornetq.core.server.Queue> nonDurableQueues = ctx.getNonDurableQueues();
long ids[] = new long [durableQueues.size() + nonDurableQueues.size()];
int i = 0;
-
+
for (org.hornetq.core.server.Queue q : durableQueues)
{
+ q.getPageSubscription().getCounter().increment(tx, 1);
ids[i++] = q.getID();
}
for (org.hornetq.core.server.Queue q : nonDurableQueues)
{
+ q.getPageSubscription().getCounter().increment(tx, 1);
ids[i++] = q.getID();
}
return ids;
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -35,7 +35,7 @@
private int consumerCount;
- private int messageCount;
+ private long messageCount;
private SimpleString filterString;
@@ -60,7 +60,7 @@
final boolean temporary,
final SimpleString filterString,
final int consumerCount,
- final int messageCount,
+ final long messageCount,
final boolean exists)
{
super(PacketImpl.SESS_QUEUEQUERY_RESP);
@@ -103,7 +103,7 @@
return consumerCount;
}
- public int getMessageCount()
+ public long getMessageCount()
{
return messageCount;
}
@@ -135,7 +135,7 @@
buffer.writeBoolean(durable);
buffer.writeBoolean(temporary);
buffer.writeInt(consumerCount);
- buffer.writeInt(messageCount);
+ buffer.writeLong(messageCount);
buffer.writeNullableSimpleString(filterString);
buffer.writeNullableSimpleString(address);
buffer.writeNullableSimpleString(name);
@@ -148,7 +148,7 @@
durable = buffer.readBoolean();
temporary = buffer.readBoolean();
consumerCount = buffer.readInt();
- messageCount = buffer.readInt();
+ messageCount = buffer.readLong();
filterString = buffer.readNullableSimpleString();
address = buffer.readNullableSimpleString();
name = buffer.readNullableSimpleString();
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -72,7 +72,7 @@
void deliverAsync();
- int getMessageCount();
+ long getMessageCount();
int getDeliveringCount();
Modified: trunk/src/main/org/hornetq/core/server/QueueQueryResult.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/QueueQueryResult.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/src/main/org/hornetq/core/server/QueueQueryResult.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -32,7 +32,7 @@
private int consumerCount;
- private int messageCount;
+ private long messageCount;
private SimpleString filterString;
@@ -46,7 +46,7 @@
final boolean temporary,
final SimpleString filterString,
final int consumerCount,
- final int messageCount)
+ final long messageCount)
{
this(name, address, durable, temporary, filterString, consumerCount, messageCount, true);
}
@@ -62,7 +62,7 @@
final boolean temporary,
final SimpleString filterString,
final int consumerCount,
- final int messageCount,
+ final long messageCount,
final boolean exists)
{
this.durable = durable;
@@ -97,7 +97,7 @@
return consumerCount;
}
- public int getMessageCount()
+ public long getMessageCount()
{
return messageCount;
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -653,13 +653,13 @@
return null;
}
- public int getMessageCount()
+ public long getMessageCount()
{
blockOnExecutorFuture();
synchronized (this)
{
- return messageReferences.size() + getScheduledCount() + getDeliveringCount();
+ return messageReferences.size() + getScheduledCount() + getDeliveringCount() + pageSubscription.getCounter().getValue();
}
}
Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -115,7 +115,7 @@
return managedQueue.isTemporary();
}
- public int getMessageCount()
+ public long getMessageCount()
{
return coreQueueControl.getMessageCount();
}
@@ -225,7 +225,7 @@
return JMSQueueControlImpl.toJSON(listMessages(filter));
}
- public int countMessages(final String filterStr) throws Exception
+ public long countMessages(final String filterStr) throws Exception
{
String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
return coreQueueControl.countMessages(filter);
Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -113,7 +113,7 @@
return managedTopic.getAddress();
}
- public int getMessageCount()
+ public long getMessageCount()
{
return getMessageCount(DurabilityType.ALL);
}
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/HornetQServerTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/HornetQServerTestCase.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/HornetQServerTestCase.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -384,7 +384,7 @@
public boolean checkEmpty(final Queue queue) throws Exception
{
- Integer messageCount = HornetQServerTestCase.servers.get(0).getMessageCountForQueue(queue.getQueueName());
+ Long messageCount = HornetQServerTestCase.servers.get(0).getMessageCountForQueue(queue.getQueueName());
if (messageCount > 0)
{
removeAllMessages(queue.getQueueName(), true);
@@ -409,7 +409,7 @@
protected boolean assertRemainingMessages(final int expected) throws Exception
{
- Integer messageCount = HornetQServerTestCase.servers.get(0).getMessageCountForQueue("Queue1");
+ Long messageCount = HornetQServerTestCase.servers.get(0).getMessageCountForQueue("Queue1");
ProxyAssertSupport.assertEquals(expected, messageCount.intValue());
return expected == messageCount.intValue();
@@ -501,7 +501,7 @@
return HornetQServerTestCase.servers.get(0).listAllSubscribersForTopic(s);
}
- protected Integer getMessageCountForQueue(final String s) throws Exception
+ protected Long getMessageCountForQueue(final String s) throws Exception
{
return HornetQServerTestCase.servers.get(0).getMessageCountForQueue(s);
}
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -73,7 +73,7 @@
conn.close();
- Integer i = getMessageCountForQueue("Queue1");
+ Long i = getMessageCountForQueue("Queue1");
ProxyAssertSupport.assertEquals(1, i.intValue());
}
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -396,7 +396,7 @@
}
- public Integer getMessageCountForQueue(final String queueName) throws Exception
+ public Long getMessageCountForQueue(final String queueName) throws Exception
{
JMSQueueControl queue = (JMSQueueControl)getHornetQServer().getManagementService()
.getResource(ResourceNames.JMS_QUEUE + queueName);
@@ -406,7 +406,7 @@
}
else
{
- return -1;
+ return -1l;
}
}
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/Server.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/Server.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/Server.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -176,7 +176,7 @@
void removeAllMessages(String destination, boolean isQueue) throws Exception;
- Integer getMessageCountForQueue(String queueName) throws Exception;
+ Long getMessageCountForQueue(String queueName) throws Exception;
List<String> listAllSubscribersForTopic(String s) throws Exception;
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -522,7 +522,7 @@
}
JMSQueueControl queueControl = (JMSQueueControl)managementService.getResource(ResourceNames.JMS_QUEUE + queue.getQueueName());
- Integer messageCount = queueControl.getMessageCount();
+ Long messageCount = queueControl.getMessageCount();
if (messageCount > 0)
{
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -98,9 +98,9 @@
return (Integer)proxy.invokeOperation("changeMessagesPriority", filter, newPriority);
}
- public int countMessages(final String filter) throws Exception
+ public long countMessages(final String filter) throws Exception
{
- return (Integer)proxy.invokeOperation("countMessages", filter);
+ return (Long)proxy.invokeOperation("countMessages", filter);
}
public boolean expireMessage(final String messageID) throws Exception
@@ -133,9 +133,9 @@
return (String)proxy.retrieveAttributeValue("expiryAddress");
}
- public int getMessageCount()
+ public long getMessageCount()
{
- return (Integer)proxy.retrieveAttributeValue("messageCount");
+ return (Long)proxy.retrieveAttributeValue("messageCount");
}
public long getMessagesAdded()
Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -69,9 +69,9 @@
return (Integer)proxy.invokeOperation("changeMessagesPriority", filter, newPriority);
}
- public int countMessages(final String filter) throws Exception
+ public long countMessages(final String filter) throws Exception
{
- return (Integer)proxy.invokeOperation("countMessages", filter);
+ return (Long)proxy.invokeOperation("countMessages", filter);
}
public boolean expireMessage(final long messageID) throws Exception
@@ -114,9 +114,9 @@
return (String)proxy.retrieveAttributeValue("filter");
}
- public int getMessageCount()
+ public long getMessageCount()
{
- return (Integer)proxy.retrieveAttributeValue("messageCount");
+ return (Long)proxy.retrieveAttributeValue("messageCount");
}
public long getMessagesAdded()
Modified: trunk/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -134,7 +134,7 @@
while ((msg = iterator.next()) != null)
{
assertEquals(key++, msg.getMessage().getIntProperty("key").intValue());
- cursor.ack(msg.getPosition());
+ cursor.confirmPosition(msg.getPosition());
}
assertEquals(NUM_MESSAGES, key);
@@ -225,7 +225,7 @@
assertEquals(key, msg.getMessage().getIntProperty("key").intValue());
assertTrue(msg.getMessage().getBooleanProperty("even").booleanValue());
key += 2;
- cursorEven.ack(msg.getPosition());
+ cursorEven.confirmPosition(msg.getPosition());
}
assertEquals(NUM_MESSAGES, key);
@@ -235,7 +235,7 @@
assertEquals(key, msg.getMessage().getIntProperty("key").intValue());
assertFalse(msg.getMessage().getBooleanProperty("even").booleanValue());
key += 2;
- cursorOdd.ack(msg.getPosition());
+ cursorOdd.confirmPosition(msg.getPosition());
}
assertEquals(NUM_MESSAGES + 1, key);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-12-16 23:20:10 UTC (rev 10048)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-12-17 02:20:26 UTC (rev 10049)
@@ -334,7 +334,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#getMessageCount()
*/
- public int getMessageCount()
+ public long getMessageCount()
{
// TODO Auto-generated method stub
return 0;
14 years, 2 months
JBoss hornetq SVN: r10048 - in trunk: src/main/org/hornetq/core/paging/cursor/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-16 18:20:10 -0500 (Thu, 16 Dec 2010)
New Revision: 10048
Modified:
trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
Log:
HORNETQ-574 more pagecounters stuff
Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2010-12-16 22:29:27 UTC (rev 10047)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2010-12-16 23:20:10 UTC (rev 10048)
@@ -40,7 +40,6 @@
/**
*
- * This method is also used by Journal.loadMessageJournal
* @param id
* @param variance
*/
Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-16 22:29:27 UTC (rev 10047)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-16 23:20:10 UTC (rev 10048)
@@ -50,7 +50,7 @@
// the journal record id that is holding the current value
private long recordID = -1;
- private final boolean persistent;
+ private boolean persistent;
private final StorageManager storage;
@@ -100,19 +100,20 @@
*/
public void increment(Transaction tx, int add) throws Exception
{
- tx.setContainsPersistent();
- if (!persistent)
+ if (persistent)
{
- replayIncrement(tx, -1, add);
+ tx.setContainsPersistent();
+ long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
+ replayIncrement(tx, id, add);
}
else
{
- long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
-
- replayIncrement(tx, id, add);
+ replayIncrement(tx, -1, add);
}
+
+
}
/**
@@ -199,6 +200,12 @@
incrementRecords.add(id);
}
}
+
+ /** used on testing only */
+ public void setPersistent(final boolean persistent)
+ {
+ this.persistent = persistent;
+ }
/** This method sould alwas be called from a single threaded executor */
protected void cleanup()
Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java 2010-12-16 22:29:27 UTC (rev 10047)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java 2010-12-16 23:20:10 UTC (rev 10048)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
+import org.hornetq.core.paging.cursor.impl.PageSubscriptionCounterImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -145,6 +146,69 @@
}
}
+
+ public void testCleanupCounterNonPersistent() throws Exception
+ {
+ ClientSessionFactory sf = sl.createSessionFactory();
+ ClientSession session = sf.createSession();
+
+ try
+ {
+ Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
+
+ PageSubscriptionCounter counter = locateCounter(queue);
+
+ ((PageSubscriptionCounterImpl)counter).setPersistent(false);
+
+ StorageManager storage = server.getStorageManager();
+
+ Transaction tx = new TransactionImpl(server.getStorageManager());
+
+ for (int i = 0 ; i < 2100; i++)
+ {
+
+ counter.increment(tx, 1);
+
+ if (i % 200 == 0)
+ {
+ tx.commit();
+
+ storage.waitOnOperations();
+
+ assertEquals(i + 1, counter.getValue());
+
+ tx = new TransactionImpl(server.getStorageManager());
+ }
+ }
+
+ tx.commit();
+
+ storage.waitOnOperations();
+
+ assertEquals(2100, counter.getValue());
+
+ server.stop();
+
+ server = newHornetQServer();
+
+ server.start();
+
+ queue = server.locateQueue(new SimpleString("A1"));
+
+ assertNotNull(queue);
+
+ counter = locateCounter(queue);
+
+ assertEquals(0, counter.getValue());
+
+ }
+ finally
+ {
+ sf.close();
+ session.close();
+ }
+ }
+
public void testRestartCounter() throws Exception
{
Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
14 years, 2 months
JBoss hornetq SVN: r10047 - trunk/src/main/org/hornetq/core/paging/cursor/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-16 17:29:27 -0500 (Thu, 16 Dec 2010)
New Revision: 10047
Modified:
trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
Log:
HORNETQ-574 more pagecounters stuff
Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-16 22:14:18 UTC (rev 10046)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-16 22:29:27 UTC (rev 10047)
@@ -41,17 +41,15 @@
// Constants -----------------------------------------------------
static final Logger log = Logger.getLogger(PageSubscriptionCounterImpl.class);
+ // Attributes ----------------------------------------------------
- // Attributes ----------------------------------------------------
-
- // TODO: making this configurable
private static final int FLUSH_COUNTER = 1000;
private final long subscriptionID;
-
+
// the journal record id that is holding the current value
private long recordID = -1;
-
+
private final boolean persistent;
private final StorageManager storage;
@@ -59,11 +57,11 @@
private final AtomicLong value = new AtomicLong(0);
private final LinkedList<Long> incrementRecords = new LinkedList<Long>();
-
+
private LinkedList<Pair<Long, Integer>> loadList;
private final Executor executor;
-
+
private final Runnable cleanupCheck = new Runnable()
{
public void run()
@@ -71,14 +69,17 @@
cleanup();
}
};
-
+
// protected LinkedList
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public PageSubscriptionCounterImpl(final StorageManager storage, final boolean persistent, final long subscriptionID, final Executor executor)
+ public PageSubscriptionCounterImpl(final StorageManager storage,
+ final boolean persistent,
+ final long subscriptionID,
+ final Executor executor)
{
this.subscriptionID = subscriptionID;
this.storage = storage;
@@ -100,11 +101,18 @@
public void increment(Transaction tx, int add) throws Exception
{
tx.setContainsPersistent();
-
- long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
- replayIncrement(tx, id, add);
+ if (!persistent)
+ {
+ replayIncrement(tx, -1, add);
+ }
+ else
+ {
+ long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
+ replayIncrement(tx, id, add);
+ }
+
}
/**
@@ -126,7 +134,7 @@
oper.operations.add(new ItemOper(this, recordID, add));
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#loadValue(long, long)
*/
@@ -135,8 +143,6 @@
this.value.set(value);
this.recordID = recordID;
}
-
-
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#incrementProcessed(long, int)
@@ -158,9 +164,9 @@
{
if (loadList == null)
{
- loadList = new LinkedList<Pair<Long,Integer>>();
+ loadList = new LinkedList<Pair<Long, Integer>>();
}
-
+
loadList.add(new Pair<Long, Integer>(id, add));
}
@@ -187,14 +193,18 @@
public void addInc(long id, int variance)
{
value.addAndGet(variance);
- incrementRecords.add(id);
+
+ if (id >= 0)
+ {
+ incrementRecords.add(id);
+ }
}
/** This method sould alwas be called from a single threaded executor */
protected void cleanup()
{
ArrayList<Long> deleteList;
-
+
long valueReplace;
synchronized (this)
{
@@ -203,33 +213,33 @@
deleteList.addAll(incrementRecords);
incrementRecords.clear();
}
-
+
long newRecordID = -1;
long txCleanup = storage.generateUniqueID();
-
+
try
{
for (Long value : deleteList)
{
storage.deleteIncrementRecord(txCleanup, value);
}
-
+
if (recordID >= 0)
{
storage.deletePageCounter(txCleanup, recordID);
}
-
- newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace);
-
+
+ newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace);
+
storage.commit(txCleanup);
-
+
storage.waitOnOperations();
}
catch (Exception e)
{
newRecordID = recordID;
-
+
log.warn(e.getMessage(), e);
try
{
14 years, 2 months
JBoss hornetq SVN: r10046 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-16 17:14:18 -0500 (Thu, 16 Dec 2010)
New Revision: 10046
Modified:
trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
Log:
HORNETQ-574 more pagecounters stuff
Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-16 19:47:43 UTC (rev 10045)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-16 22:14:18 UTC (rev 10046)
@@ -169,13 +169,16 @@
*/
public void processReload()
{
- for (Pair<Long, Integer> incElement : loadList)
+ if (loadList != null)
{
- value.addAndGet(incElement.b);
- incrementRecords.add(incElement.a);
+ for (Pair<Long, Integer> incElement : loadList)
+ {
+ value.addAndGet(incElement.b);
+ incrementRecords.add(incElement.a);
+ }
+ loadList.clear();
+ loadList = null;
}
- loadList.clear();
- loadList = null;
}
/* (non-Javadoc)
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-12-16 19:47:43 UTC (rev 10045)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-12-16 22:14:18 UTC (rev 10046)
@@ -1064,7 +1064,7 @@
if (sub != null)
{
- sub.getCounter().loadValue(record.id, encoding.value);
+ sub.getCounter().loadInc(record.id, encoding.value);
}
else
{
@@ -1127,6 +1127,11 @@
}
loadPreparedTransactions(postOffice, pagingManager, resourceManager, queues, queueInfos, preparedTransactions, duplicateIDMap, pageSubscriptions);
+
+ for (PageSubscription sub: pageSubscriptions.values())
+ {
+ sub.getCounter().processReload();
+ }
for (LargeServerMessage msg : largeMessages)
{
Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java 2010-12-16 19:47:43 UTC (rev 10045)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java 2010-12-16 22:14:18 UTC (rev 10046)
@@ -13,14 +13,14 @@
package org.hornetq.tests.integration.paging;
+import javax.transaction.xa.Xid;
+
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
-import org.hornetq.core.paging.cursor.impl.PageSubscriptionCounterImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -85,6 +85,66 @@
}
}
+ public void testCleanupCounter() throws Exception
+ {
+ ClientSessionFactory sf = sl.createSessionFactory();
+ ClientSession session = sf.createSession();
+
+ try
+ {
+ Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
+
+ PageSubscriptionCounter counter = locateCounter(queue);
+
+ StorageManager storage = server.getStorageManager();
+
+ Transaction tx = new TransactionImpl(server.getStorageManager());
+
+ for (int i = 0 ; i < 2100; i++)
+ {
+
+ counter.increment(tx, 1);
+
+ if (i % 200 == 0)
+ {
+ tx.commit();
+
+ storage.waitOnOperations();
+
+ assertEquals(i + 1, counter.getValue());
+
+ tx = new TransactionImpl(server.getStorageManager());
+ }
+ }
+
+ tx.commit();
+
+ storage.waitOnOperations();
+
+ assertEquals(2100, counter.getValue());
+
+ server.stop();
+
+ server = newHornetQServer();
+
+ server.start();
+
+ queue = server.locateQueue(new SimpleString("A1"));
+
+ assertNotNull(queue);
+
+ counter = locateCounter(queue);
+
+ assertEquals(2100, counter.getValue());
+
+ }
+ finally
+ {
+ sf.close();
+ session.close();
+ }
+ }
+
public void testRestartCounter() throws Exception
{
Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
@@ -112,13 +172,13 @@
server = newHornetQServer();
server.start();
-
+
queue = server.locateQueue(new SimpleString("A1"));
-
+
assertNotNull(queue);
-
+
counter = locateCounter(queue);
-
+
assertEquals(1, counter.getValue());
}
@@ -141,34 +201,52 @@
public void testPrepareCounter() throws Exception
{
- ClientSessionFactory sf = sl.createSessionFactory();
- ClientSession session = sf.createSession();
+ Xid xid = newXID();
- try
- {
- Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
+ Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
- PageSubscriptionCounter counter = locateCounter(queue);
+ PageSubscriptionCounter counter = locateCounter(queue);
- StorageManager storage = server.getStorageManager();
+ StorageManager storage = server.getStorageManager();
- Transaction tx = new TransactionImpl(server.getStorageManager());
+ Transaction tx = new TransactionImpl(xid, server.getStorageManager(), 300);
+ for (int i = 0 ; i < 2000; i++)
+ {
counter.increment(tx, 1);
+ }
- assertEquals(0, counter.getValue());
+ assertEquals(0, counter.getValue());
- tx.commit();
+ tx.prepare();
- storage.waitOnOperations();
+ storage.waitOnOperations();
- assertEquals(1, counter.getValue());
- }
- finally
- {
- sf.close();
- session.close();
- }
+ assertEquals(0, counter.getValue());
+
+ server.stop();
+
+ server = newHornetQServer();
+
+ server.start();
+
+ queue = server.locateQueue(new SimpleString("A1"));
+
+ assertNotNull(queue);
+
+ counter = locateCounter(queue);
+
+ tx = server.getResourceManager().removeTransaction(xid);
+
+ assertNotNull(tx);
+
+ assertEquals(0, counter.getValue());
+
+ tx.commit(false);
+
+ assertEquals(2000, counter.getValue());
+
+
}
// Package protected ---------------------------------------------
14 years, 2 months
JBoss hornetq SVN: r10045 - in trunk: src/main/org/hornetq/core/paging/cursor/impl and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-16 14:47:43 -0500 (Thu, 16 Dec 2010)
New Revision: 10045
Modified:
trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
Log:
Page Counters commit 2
Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2010-12-16 15:49:32 UTC (rev 10044)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2010-12-16 19:47:43 UTC (rev 10045)
@@ -25,20 +25,25 @@
public interface PageSubscriptionCounter
{
- public abstract long getValue();
+ long getValue();
- public abstract void increment(Transaction tx, int add) throws Exception;
+ void increment(Transaction tx, int add) throws Exception;
- public abstract void loadValue(final long recordValueID, final long value);
+ void loadValue(final long recordValueID, final long value);
+
+ void loadInc(final long recordInd, final int add);
+
+ void replayIncrement(Transaction tx, long recordID, int add);
+
+ /** This will process the reload */
+ void processReload();
- public abstract void incrementProcessed(long id, int variance);
-
/**
*
* This method is also used by Journal.loadMessageJournal
* @param id
* @param variance
*/
- public abstract void addInc(long id, int variance);
+ void addInc(long id, int variance);
}
\ No newline at end of file
Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-16 15:49:32 UTC (rev 10044)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-16 19:47:43 UTC (rev 10045)
@@ -19,6 +19,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
+import org.hornetq.api.core.Pair;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
import org.hornetq.core.persistence.StorageManager;
@@ -58,6 +59,8 @@
private final AtomicLong value = new AtomicLong(0);
private final LinkedList<Long> incrementRecords = new LinkedList<Long>();
+
+ private LinkedList<Pair<Long, Integer>> loadList;
private final Executor executor;
@@ -96,6 +99,22 @@
*/
public void increment(Transaction tx, int add) throws Exception
{
+ tx.setContainsPersistent();
+
+ long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
+
+ replayIncrement(tx, id, add);
+
+ }
+
+ /**
+ * This method will install the prepared TXs
+ * @param tx
+ * @param recordID
+ * @param add
+ */
+ public void replayIncrement(Transaction tx, long recordID, int add)
+ {
CounterOperations oper = (CounterOperations)tx.getProperty(TransactionPropertyIndexes.PAGE_COUNT_INC);
if (oper == null)
@@ -105,18 +124,16 @@
tx.addOperation(oper);
}
- long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
-
- oper.operations.add(new ItemOper(this, id, add));
-
+ oper.operations.add(new ItemOper(this, recordID, add));
}
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#loadValue(long, long)
*/
- public synchronized void loadValue(final long recordValueID, final long value)
+ public synchronized void loadValue(final long recordID, final long value)
{
this.value.set(value);
+ this.recordID = recordID;
}
@@ -124,9 +141,9 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#incrementProcessed(long, int)
*/
- public synchronized void incrementProcessed(long id, int variance)
+ public synchronized void incrementProcessed(long id, int add)
{
- addInc(id, variance);
+ addInc(id, add);
if (incrementRecords.size() > FLUSH_COUNTER)
{
executor.execute(cleanupCheck);
@@ -135,6 +152,33 @@
}
/* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageSubscriptionCounter#loadInc(long, int)
+ */
+ public void loadInc(long id, int add)
+ {
+ if (loadList == null)
+ {
+ loadList = new LinkedList<Pair<Long,Integer>>();
+ }
+
+ loadList.add(new Pair<Long, Integer>(id, add));
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageSubscriptionCounter#processReload()
+ */
+ public void processReload()
+ {
+ for (Pair<Long, Integer> incElement : loadList)
+ {
+ value.addAndGet(incElement.b);
+ incrementRecords.add(incElement.a);
+ }
+ loadList.clear();
+ loadList = null;
+ }
+
+ /* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#addInc(long, int)
*/
public void addInc(long id, int variance)
@@ -211,14 +255,14 @@
static class ItemOper
{
- public ItemOper(PageSubscriptionCounter counter, long id, int add)
+ public ItemOper(PageSubscriptionCounterImpl counter, long id, int add)
{
this.counter = counter;
this.id = id;
this.ammount = add;
}
- PageSubscriptionCounter counter;
+ PageSubscriptionCounterImpl counter;
long id;
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-12-16 15:49:32 UTC (rev 10044)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-12-16 19:47:43 UTC (rev 10045)
@@ -52,8 +52,10 @@
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.persistence.GroupingInfo;
@@ -138,9 +140,9 @@
public static final byte HEURISTIC_COMPLETION = 38;
public static final byte ACKNOWLEDGE_CURSOR = 39;
-
+
public static final byte PAGE_CURSOR_COUNTER_VALUE = 40;
-
+
public static final byte PAGE_CURSOR_COUNTER_INC = 41;
private UUID persistentID;
@@ -278,7 +280,7 @@
}
else
{
- idGenerator = new BatchingIDGenerator(0, JournalStorageManager.CHECKPOINT_BATCH_SIZE, bindingsJournal);
+ idGenerator = new BatchingIDGenerator(0, JournalStorageManager.CHECKPOINT_BATCH_SIZE, bindingsJournal);
}
Journal localMessage = new JournalImpl(config.getJournalFileSize(),
config.getJournalMinFiles(),
@@ -440,7 +442,7 @@
}
LargeServerMessageImpl largeMessage = (LargeServerMessageImpl)createLargeMessage();
-
+
largeMessage.copyHeadersAndProperties(message);
largeMessage.setMessageID(id);
@@ -496,16 +498,18 @@
syncNonTransactional,
getContext(syncNonTransactional));
}
-
+
public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception
{
- long ackID = idGenerator.generateID();
- position.setRecordID(ackID);
- messageJournal.appendAddRecord(ackID, ACKNOWLEDGE_CURSOR, new CursorAckRecordEncoding(queueID, position), syncNonTransactional, getContext(syncNonTransactional));
+ long ackID = idGenerator.generateID();
+ position.setRecordID(ackID);
+ messageJournal.appendAddRecord(ackID,
+ ACKNOWLEDGE_CURSOR,
+ new CursorAckRecordEncoding(queueID, position),
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
-
-
public void deleteMessage(final long messageID) throws Exception
{
// Messages are deleted on postACK, one after another.
@@ -592,8 +596,7 @@
{
messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
- new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
- depages),
+ new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
syncNonTransactional,
getContext(syncNonTransactional));
}
@@ -621,9 +624,11 @@
{
long ackID = idGenerator.generateID();
position.setRecordID(ackID);
- messageJournal.appendAddRecordTransactional(txID, ackID, ACKNOWLEDGE_CURSOR, new CursorAckRecordEncoding(queueID, position));
+ messageJournal.appendAddRecordTransactional(txID,
+ ackID,
+ ACKNOWLEDGE_CURSOR,
+ new CursorAckRecordEncoding(queueID, position));
}
-
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#deleteCursorAcknowledgeTransactional(long, long)
@@ -633,7 +638,6 @@
messageJournal.appendDeleteRecordTransactional(txID, ackID);
}
-
public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception
{
long id = generateUniqueID();
@@ -807,6 +811,8 @@
Map<Long, Map<Long, AddMessageRecord>> queueMap = new HashMap<Long, Map<Long, AddMessageRecord>>();
+ Map<Long, PageSubscription> pageSubscriptions = new HashMap<Long, PageSubscription>();
+
final int totalSize = records.size();
for (int reccount = 0; reccount < totalSize; reccount++)
@@ -1011,25 +1017,62 @@
{
CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);
-
+
encoding.position.setRecordID(record.id);
-
- QueueBindingInfo queueInfo = queueInfos.get(encoding.queueID);
-
- if (queueInfo != null)
+
+ PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+
+ if (sub != null)
{
- SimpleString address = queueInfo.getAddress();
- PagingStore store = pagingManager.getPageStore(address);
- PageSubscription cursor = store.getCursorProvier().getSubscription(encoding.queueID);
- cursor.reloadACK(encoding.position);
+ sub.reloadACK(encoding.position);
}
else
{
log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
}
-
+
break;
}
+ case PAGE_CURSOR_COUNTER_VALUE:
+ {
+ PageCountRecord encoding = new PageCountRecord();
+
+ encoding.decode(buff);
+
+ PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+
+ if (sub != null)
+ {
+ sub.getCounter().loadValue(record.id, encoding.value);
+ }
+ else
+ {
+ log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+ }
+
+ break;
+ }
+
+ case PAGE_CURSOR_COUNTER_INC:
+ {
+ PageCountRecordInc encoding = new PageCountRecordInc();
+
+ encoding.decode(buff);
+
+
+ PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+
+ if (sub != null)
+ {
+ sub.getCounter().loadValue(record.id, encoding.value);
+ }
+ else
+ {
+ log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+ }
+
+ break;
+ }
default:
{
throw new IllegalStateException("Invalid record type " + recordType);
@@ -1083,7 +1126,7 @@
}
}
- loadPreparedTransactions(postOffice, pagingManager, resourceManager, queues, preparedTransactions, duplicateIDMap);
+ loadPreparedTransactions(postOffice, pagingManager, resourceManager, queues, queueInfos, preparedTransactions, duplicateIDMap, pageSubscriptions);
for (LargeServerMessage msg : largeMessages)
{
@@ -1110,7 +1153,7 @@
}
}
}
-
+
// To recover positions on Iterators
if (pagingManager != null)
{
@@ -1132,6 +1175,35 @@
return info;
}
+ /**
+ * @param queueID
+ * @param pageSubscriptions
+ * @param queueInfos
+ * @return
+ */
+ private PageSubscription locateSubscription(final long queueID,
+ final Map<Long, PageSubscription> pageSubscriptions,
+ final Map<Long, QueueBindingInfo> queueInfos,
+ final PagingManager pagingManager) throws Exception
+ {
+
+ PageSubscription subs = pageSubscriptions.get(queueID);
+ if (subs == null)
+ {
+ QueueBindingInfo queueInfo = queueInfos.get(queueID);
+
+ if (queueInfo != null)
+ {
+ SimpleString address = queueInfo.getAddress();
+ PagingStore store = pagingManager.getPageStore(address);
+ subs = store.getCursorProvier().getSubscription(queueID);
+ pageSubscriptions.put(queueID, subs);
+ }
+ }
+
+ return subs;
+ }
+
// grouping handler operations
public void addGrouping(final GroupBinding groupBinding) throws Exception
{
@@ -1170,8 +1242,6 @@
{
bindingsJournal.appendDeleteRecord(queueBindingID, true);
}
-
-
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#storePageCounterAdd(long, long, int)
@@ -1179,18 +1249,23 @@
public long storePageCounterInc(long txID, long queueID, int value) throws Exception
{
long recordID = idGenerator.generateID();
- messageJournal.appendAddRecordTransactional(txID, recordID, JournalStorageManager.PAGE_CURSOR_COUNTER_INC, new PageCountRecord(queueID, value));
+ messageJournal.appendAddRecordTransactional(txID,
+ recordID,
+ JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
+ new PageCountRecordInc(queueID, value));
return recordID;
}
-
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long, long)
*/
public long storePageCounter(long txID, long queueID, long value) throws Exception
{
long recordID = idGenerator.generateID();
- messageJournal.appendAddRecordTransactional(txID, recordID, JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE, new PageCountRecord(queueID, value));
+ messageJournal.appendAddRecordTransactional(txID,
+ recordID,
+ JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE,
+ new PageCountRecord(queueID, value));
return recordID;
}
@@ -1209,9 +1284,7 @@
{
messageJournal.appendDeleteRecordTransactional(txID, recordID);
}
-
-
-
+
public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
final List<GroupingInfo> groupingInfos) throws Exception
{
@@ -1458,8 +1531,10 @@
final PagingManager pagingManager,
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
+ final Map<Long, QueueBindingInfo> queueInfos,
final List<PreparedTransactionInfo> preparedTransactions,
- final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
+ final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
+ final Map<Long, PageSubscription> pageSubscriptions) throws Exception
{
// recover prepared transactions
for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
@@ -1609,6 +1684,34 @@
// and make sure the rollback will work well also
break;
}
+ case PAGE_CURSOR_COUNTER_VALUE:
+ {
+ log.warn("PAGE_CURSOR_COUNTER_VALUE record used on a prepared statement, what shouldn't happen");
+
+ break;
+ }
+
+ case PAGE_CURSOR_COUNTER_INC:
+ {
+ PageCountRecordInc encoding = new PageCountRecordInc();
+
+ encoding.decode(buff);
+
+
+ PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+
+ if (sub != null)
+ {
+ sub.getCounter().replayIncrement(tx, record.id, encoding.value);
+ }
+ else
+ {
+ log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+ }
+
+ break;
+ }
+
default:
{
JournalStorageManager.log.warn("InternalError: Record type " + recordType +
@@ -2297,23 +2400,23 @@
}
}
-
+
private static final class PageCountRecord implements EncodingSupport
{
-
+
PageCountRecord()
{
-
+
}
-
+
PageCountRecord(long queueID, long value)
{
this.queueID = queueID;
this.value = value;
}
-
+
long queueID;
-
+
long value;
/* (non-Javadoc)
@@ -2341,10 +2444,55 @@
queueID = buffer.readLong();
value = buffer.readLong();
}
-
-
+
}
+ private static final class PageCountRecordInc implements EncodingSupport
+ {
+
+ PageCountRecordInc()
+ {
+
+ }
+
+ PageCountRecordInc(long queueID, int value)
+ {
+ this.queueID = queueID;
+ this.value = value;
+ }
+
+ long queueID;
+
+ int value;
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeLong(queueID);
+ buffer.writeInt(value);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(HornetQBuffer buffer)
+ {
+ queueID = buffer.readLong();
+ value = buffer.readInt();
+ }
+
+ }
+
private static final class AddMessageRecord
{
public AddMessageRecord(final ServerMessage message)
@@ -2366,7 +2514,7 @@
this.queueID = queueID;
this.position = position;
}
-
+
public CursorAckRecordEncoding()
{
this.position = new PagePositionImpl();
Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java 2010-12-16 15:49:32 UTC (rev 10044)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java 2010-12-16 19:47:43 UTC (rev 10045)
@@ -119,7 +119,7 @@
counter = locateCounter(queue);
- //assertEquals(1, counter.getValue());
+ assertEquals(1, counter.getValue());
}
14 years, 2 months
JBoss hornetq SVN: r10044 - in trunk: src/main/org/hornetq/core/paging/cursor/impl and 8 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-16 10:49:32 -0500 (Thu, 16 Dec 2010)
New Revision: 10044
Added:
trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
Removed:
trunk/tests/src/org/hornetq/tests/integration/paging/PagePositionTest.java
Modified:
trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
trunk/src/main/org/hornetq/core/persistence/StorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
PageCounters first commit
Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -33,7 +33,10 @@
// To be called before the server is down
void stop();
+ // TODO: this method is only used on testcases and can go away
void bookmark(PagePosition position) throws Exception;
+
+ PageSubscriptionCounter getCounter();
long getId();
Added: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java (rev 0)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor;
+
+import org.hornetq.core.transaction.Transaction;
+
+/**
+ * A PagingSubscriptionCounterInterface
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public interface PageSubscriptionCounter
+{
+
+ public abstract long getValue();
+
+ public abstract void increment(Transaction tx, int add) throws Exception;
+
+ public abstract void loadValue(final long recordValueID, final long value);
+
+ public abstract void incrementProcessed(long id, int variance);
+
+ /**
+ *
+ * This method is also used by Journal.loadMessageJournal
+ * @param id
+ * @param variance
+ */
+ public abstract void addInc(long id, int variance);
+
+}
\ No newline at end of file
Added: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java (rev 0)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -0,0 +1,287 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor.impl;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionOperation;
+import org.hornetq.core.transaction.TransactionPropertyIndexes;
+
+/**
+ * This class will encapsulate the persistent counters for the PagingSubscription
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class PageSubscriptionCounterImpl implements PageSubscriptionCounter
+{
+
+ // Constants -----------------------------------------------------
+ static final Logger log = Logger.getLogger(PageSubscriptionCounterImpl.class);
+
+
+ // Attributes ----------------------------------------------------
+
+ // TODO: making this configurable
+ private static final int FLUSH_COUNTER = 1000;
+
+ private final long subscriptionID;
+
+ // the journal record id that is holding the current value
+ private long recordID = -1;
+
+ private final boolean persistent;
+
+ private final StorageManager storage;
+
+ private final AtomicLong value = new AtomicLong(0);
+
+ private final LinkedList<Long> incrementRecords = new LinkedList<Long>();
+
+ private final Executor executor;
+
+ private final Runnable cleanupCheck = new Runnable()
+ {
+ public void run()
+ {
+ cleanup();
+ }
+ };
+
+ // protected LinkedList
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public PageSubscriptionCounterImpl(final StorageManager storage, final boolean persistent, final long subscriptionID, final Executor executor)
+ {
+ this.subscriptionID = subscriptionID;
+ this.storage = storage;
+ this.executor = executor;
+ this.persistent = persistent;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#getValue()
+ */
+ public long getValue()
+ {
+ return value.get();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#increment(org.hornetq.core.transaction.Transaction, int)
+ */
+ public void increment(Transaction tx, int add) throws Exception
+ {
+ CounterOperations oper = (CounterOperations)tx.getProperty(TransactionPropertyIndexes.PAGE_COUNT_INC);
+
+ if (oper == null)
+ {
+ oper = new CounterOperations();
+ tx.putProperty(TransactionPropertyIndexes.PAGE_COUNT_INC, oper);
+ tx.addOperation(oper);
+ }
+
+ long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
+
+ oper.operations.add(new ItemOper(this, id, add));
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#loadValue(long, long)
+ */
+ public synchronized void loadValue(final long recordValueID, final long value)
+ {
+ this.value.set(value);
+ }
+
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#incrementProcessed(long, int)
+ */
+ public synchronized void incrementProcessed(long id, int variance)
+ {
+ addInc(id, variance);
+ if (incrementRecords.size() > FLUSH_COUNTER)
+ {
+ executor.execute(cleanupCheck);
+ }
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.impl.PagingSubscriptionCounterInterface#addInc(long, int)
+ */
+ public void addInc(long id, int variance)
+ {
+ value.addAndGet(variance);
+ incrementRecords.add(id);
+ }
+
+ /** This method sould alwas be called from a single threaded executor */
+ protected void cleanup()
+ {
+ ArrayList<Long> deleteList;
+
+ long valueReplace;
+ synchronized (this)
+ {
+ valueReplace = value.get();
+ deleteList = new ArrayList<Long>(incrementRecords.size());
+ deleteList.addAll(incrementRecords);
+ incrementRecords.clear();
+ }
+
+ long newRecordID = -1;
+
+ long txCleanup = storage.generateUniqueID();
+
+ try
+ {
+ for (Long value : deleteList)
+ {
+ storage.deleteIncrementRecord(txCleanup, value);
+ }
+
+ if (recordID >= 0)
+ {
+ storage.deletePageCounter(txCleanup, recordID);
+ }
+
+ newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace);
+
+ storage.commit(txCleanup);
+
+ storage.waitOnOperations();
+ }
+ catch (Exception e)
+ {
+ newRecordID = recordID;
+
+ log.warn(e.getMessage(), e);
+ try
+ {
+ storage.rollback(txCleanup);
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+ finally
+ {
+ recordID = newRecordID;
+ }
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ static class ItemOper
+ {
+
+ public ItemOper(PageSubscriptionCounter counter, long id, int add)
+ {
+ this.counter = counter;
+ this.id = id;
+ this.ammount = add;
+ }
+
+ PageSubscriptionCounter counter;
+
+ long id;
+
+ int ammount;
+ }
+
+ static class CounterOperations implements TransactionOperation
+ {
+ LinkedList<ItemOper> operations = new LinkedList<ItemOper>();
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforePrepare(Transaction tx) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#afterPrepare(org.hornetq.core.transaction.Transaction)
+ */
+ public void afterPrepare(Transaction tx)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforeCommit(Transaction tx) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
+ */
+ public void afterCommit(Transaction tx)
+ {
+ for (ItemOper oper : operations)
+ {
+ oper.counter.incrementProcessed(oper.id, oper.ammount);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforeRollback(Transaction tx) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
+ */
+ public void afterRollback(Transaction tx)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
+ */
+ public List<MessageReference> getRelatedMessageReferences()
+ {
+ return null;
+ }
+ }
+
+}
Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -39,6 +39,7 @@
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.MessageReference;
@@ -98,6 +99,8 @@
private List<PagePosition> recoveredACK;
private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
+
+ private final PageSubscriptionCounter counter;
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
@@ -121,6 +124,7 @@
this.executor = executor;
this.filter = filter;
this.persistent = persistent;
+ this.counter = new PageSubscriptionCounterImpl(store, persistent, cursorId, executor);
}
// Public --------------------------------------------------------
@@ -167,6 +171,11 @@
ack(position);
}
+ public PageSubscriptionCounter getCounter()
+ {
+ return counter;
+ }
+
public void scheduleCleanupCheck()
{
if (autoCleanup)
Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -39,7 +39,6 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.transaction.ResourceManager;
-import org.hornetq.utils.UUID;
/**
*
@@ -190,4 +189,21 @@
void deleteSecurityRoles(SimpleString addressMatch) throws Exception;
List<PersistedRoles> recoverPersistedRoles() throws Exception;
+
+ /**
+ * @return The ID with the stored counter
+ */
+ long storePageCounter(long txID, long queueID, long value) throws Exception;
+
+ void deleteIncrementRecord(long txID, long recordID) throws Exception;
+
+ void deletePageCounter(long txID, long recordID) throws Exception;
+
+ /**
+ * @return the ID with the increment record
+ * @throws Exception
+ */
+ long storePageCounterInc(long txID, long queueID, int add) throws Exception;
+
+
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -52,8 +52,8 @@
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.PageSubscription;
-import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.persistence.GroupingInfo;
@@ -75,9 +75,9 @@
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
-import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.ExecutorFactory;
@@ -138,6 +138,10 @@
public static final byte HEURISTIC_COMPLETION = 38;
public static final byte ACKNOWLEDGE_CURSOR = 39;
+
+ public static final byte PAGE_CURSOR_COUNTER_VALUE = 40;
+
+ public static final byte PAGE_CURSOR_COUNTER_INC = 41;
private UUID persistentID;
@@ -1166,7 +1170,48 @@
{
bindingsJournal.appendDeleteRecord(queueBindingID, true);
}
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storePageCounterAdd(long, long, int)
+ */
+ public long storePageCounterInc(long txID, long queueID, int value) throws Exception
+ {
+ long recordID = idGenerator.generateID();
+ messageJournal.appendAddRecordTransactional(txID, recordID, JournalStorageManager.PAGE_CURSOR_COUNTER_INC, new PageCountRecord(queueID, value));
+ return recordID;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long, long)
+ */
+ public long storePageCounter(long txID, long queueID, long value) throws Exception
+ {
+ long recordID = idGenerator.generateID();
+ messageJournal.appendAddRecordTransactional(txID, recordID, JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE, new PageCountRecord(queueID, value));
+ return recordID;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deleteIncrementRecord(long, long)
+ */
+ public void deleteIncrementRecord(long txID, long recordID) throws Exception
+ {
+ messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deletePageCounter(long, long)
+ */
+ public void deletePageCounter(long txID, long recordID) throws Exception
+ {
+ messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ }
+
+
+
public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
final List<GroupingInfo> groupingInfos) throws Exception
{
@@ -2253,7 +2298,53 @@
}
+ private static final class PageCountRecord implements EncodingSupport
+ {
+
+ PageCountRecord()
+ {
+
+ }
+
+ PageCountRecord(long queueID, long value)
+ {
+ this.queueID = queueID;
+ this.value = value;
+ }
+
+ long queueID;
+
+ long value;
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_LONG * 2;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeLong(queueID);
+ buffer.writeLong(value);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(HornetQBuffer buffer)
+ {
+ queueID = buffer.readLong();
+ value = buffer.readLong();
+ }
+
+
+ }
+
private static final class AddMessageRecord
{
public AddMessageRecord(final ServerMessage message)
Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -471,4 +471,40 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long, long)
+ */
+ public long storePageCounter(long txID, long queueID, long value) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deleteIncrementRecord(long, long)
+ */
+ public void deleteIncrementRecord(long txID, long recordID) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deletePageCounter(long, long)
+ */
+ public void deletePageCounter(long txID, long recordID) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storePageCounterInc(long, long, int)
+ */
+ public long storePageCounterInc(long txID, long queueID, int add) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -135,6 +135,8 @@
SimpleString filterString,
boolean durable,
boolean temporary) throws Exception;
+
+ Queue locateQueue(SimpleString queueName) throws Exception;
void destroyQueue(SimpleString queueName, ServerSession session) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -22,9 +22,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
- import java.util.Map.Entry;
+import java.util.Map.Entry;
import java.util.Set;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -83,7 +82,15 @@
import org.hornetq.core.security.Role;
import org.hornetq.core.security.SecurityStore;
import org.hornetq.core.security.impl.SecurityStoreImpl;
-import org.hornetq.core.server.*;
+import org.hornetq.core.server.ActivateCallback;
+import org.hornetq.core.server.Bindable;
+import org.hornetq.core.server.Divert;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.MemoryManager;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.cluster.Transformer;
import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
@@ -956,6 +963,20 @@
{
return createQueue(address, queueName, filterString, durable, temporary, false);
}
+
+ public Queue locateQueue(SimpleString queueName) throws Exception
+ {
+ Binding binding = postOffice.getBinding(queueName);
+
+ Bindable queue = binding.getBindable();
+
+ if (!(queue instanceof Queue))
+ {
+ throw new IllegalStateException("locateQueue should only be used to locate queues");
+ }
+
+ return (Queue) binding.getBindable();
+ }
public Queue deployQueue(final SimpleString address,
final SimpleString queueName,
Modified: trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -25,6 +25,8 @@
public class TransactionPropertyIndexes
{
+ public static final int PAGE_COUNT_INC = 3;
+
public static final int PAGE_TRANSACTION_UPDATE = 4;
public static final int PAGE_TRANSACTION = 5;
Deleted: trunk/tests/src/org/hornetq/tests/integration/paging/PagePositionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagePositionTest.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagePositionTest.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -1,67 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.paging;
-
-import org.hornetq.tests.util.ServiceTestBase;
-
-/**
- * A PageCursorTest
- *
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class PagePositionTest extends ServiceTestBase
-{
-
- // Test what would happen on redelivery situations
- public void testRedeliverLike()
- {
-
- }
-
- public void testRedeliverPersistence()
- {
-
- }
-
- public void testDeletePagesAfterRedelivery()
- {
-
- }
-
- public void testNextAfterPosition()
- {
-
- }
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Added: trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingCounterTest.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.paging;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
+import org.hornetq.core.paging.cursor.impl.PageSubscriptionCounterImpl;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A PagingCounterTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class PagingCounterTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private HornetQServer server;
+
+ private ServerLocator sl;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testCounter() throws Exception
+ {
+ ClientSessionFactory sf = sl.createSessionFactory();
+ ClientSession session = sf.createSession();
+
+ try
+ {
+ Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
+
+ PageSubscriptionCounter counter = locateCounter(queue);
+
+ StorageManager storage = server.getStorageManager();
+
+ Transaction tx = new TransactionImpl(server.getStorageManager());
+
+ counter.increment(tx, 1);
+
+ assertEquals(0, counter.getValue());
+
+ tx.commit();
+
+ storage.waitOnOperations();
+
+ assertEquals(1, counter.getValue());
+ }
+ finally
+ {
+ sf.close();
+ session.close();
+ }
+ }
+
+ public void testRestartCounter() throws Exception
+ {
+ Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
+
+ PageSubscriptionCounter counter = locateCounter(queue);
+
+ StorageManager storage = server.getStorageManager();
+
+ Transaction tx = new TransactionImpl(server.getStorageManager());
+
+ counter.increment(tx, 1);
+
+ assertEquals(0, counter.getValue());
+
+ tx.commit();
+
+ storage.waitOnOperations();
+
+ assertEquals(1, counter.getValue());
+
+ sl.close();
+
+ server.stop();
+
+ server = newHornetQServer();
+
+ server.start();
+
+ queue = server.locateQueue(new SimpleString("A1"));
+
+ assertNotNull(queue);
+
+ counter = locateCounter(queue);
+
+ //assertEquals(1, counter.getValue());
+
+ }
+
+ /**
+ * @param queue
+ * @return
+ * @throws Exception
+ */
+ private PageSubscriptionCounter locateCounter(Queue queue) throws Exception
+ {
+ PageSubscription subscription = server.getPagingManager()
+ .getPageStore(new SimpleString("A1"))
+ .getCursorProvier()
+ .getSubscription(queue.getID());
+
+ PageSubscriptionCounter counter = subscription.getCounter();
+ return counter;
+ }
+
+ public void testPrepareCounter() throws Exception
+ {
+ ClientSessionFactory sf = sl.createSessionFactory();
+ ClientSession session = sf.createSession();
+
+ try
+ {
+ Queue queue = server.createQueue(new SimpleString("A1"), new SimpleString("A1"), null, true, false);
+
+ PageSubscriptionCounter counter = locateCounter(queue);
+
+ StorageManager storage = server.getStorageManager();
+
+ Transaction tx = new TransactionImpl(server.getStorageManager());
+
+ counter.increment(tx, 1);
+
+ assertEquals(0, counter.getValue());
+
+ tx.commit();
+
+ storage.waitOnOperations();
+
+ assertEquals(1, counter.getValue());
+ }
+ finally
+ {
+ sf.close();
+ session.close();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ server = newHornetQServer();
+
+ server.start();
+
+ sl = createInVMNonHALocator();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ sl.close();
+
+ server.stop();
+
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ private HornetQServer newHornetQServer()
+ {
+ HornetQServer server = super.createServer(true, false);
+
+ AddressSettings defaultSetting = new AddressSettings();
+ defaultSetting.setPageSizeBytes(10 * 1024);
+ defaultSetting.setMaxSizeBytes(20 * 1024);
+
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ return server;
+ }
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-12-16 15:13:18 UTC (rev 10043)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-12-16 15:49:32 UTC (rev 10044)
@@ -1590,6 +1590,42 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long, long)
+ */
+ public long storePageCounter(long txID, long queueID, long value) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deleteIncrementRecord(long, long)
+ */
+ public void deleteIncrementRecord(long txID, long recordID) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deletePageCounter(long, long)
+ */
+ public void deletePageCounter(long txID, long recordID) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storePageCounterInc(long, long, int)
+ */
+ public long storePageCounterInc(long txID, long queueID, int add) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
14 years, 2 months
JBoss hornetq SVN: r10043 - trunk/docs/eap-manual/en.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-12-16 10:13:18 -0500 (Thu, 16 Dec 2010)
New Revision: 10043
Modified:
trunk/docs/eap-manual/en/clusters.xml
Log:
updated eap manual
Modified: trunk/docs/eap-manual/en/clusters.xml
===================================================================
--- trunk/docs/eap-manual/en/clusters.xml 2010-12-16 13:33:25 UTC (rev 10042)
+++ trunk/docs/eap-manual/en/clusters.xml 2010-12-16 15:13:18 UTC (rev 10043)
@@ -34,12 +34,8 @@
<title>Colocated Live and Backup in Symmetrical cluster</title>
<para>
The colocated symmetrical topology will be the most widely used topology, this is where an EAP instance has
- a
- live
- node running plus 1 or
- more backup nodes. Each backup node will belong to a live node on another EAP instance. In a simple cluster
- of
- 2
+ a live node running plus 1 or more backup nodes. Each backup node will belong to a live node on another EAP
+ instance. In a simple cluster o 2
EAP instances this would mean that each EAP instance would have a live server and 1 backup server as in
diagram1.
</para>
@@ -51,13 +47,16 @@
configured
with 2 backups, 1 for each of the other live servers, or you may just want to have 1 backup for each live.
</para>
+ <para>
+ The reason for having the backup server colocated is so they work with MDB's, when a back up server comes ive
+ it forwards any messages to the live server who deals with them in the normal fashion. If your application was
+ pure JMS you could, if chosen, use a dedicated backup server.
+ </para>
<section>
<title>Configuration</title>
<para>
First lets start with the configuration of the live server, we will use the EAP 'all' configuration as
- our
- starting
- point. Since this version only supports shared store for failover we need to configure this in the
+ our starting point. Since this version only supports shared store for failover we need to configure this in the
<literal>hornetq-configuration.xml</literal>
file like so:
</para>
@@ -74,16 +73,16 @@
something like:
</para>
<programlisting>
- <large-messages-directory>/media/shared/data/large-messages</large-messages-directory>
- <bindings-directory>/media/shared/data/bindings</bindings-directory>
- <journal-directory>/media/shared/data/journal</journal-directory>
- <paging-directory>/media/shared/data/paging</paging-directory>
+ <large-messages-directory>/media/shared/data/large-messages</large-messages-directory>
+ <bindings-directory>/media/shared/data/bindings</bindings-directory>
+ <journal-directory>/media/shared/data/journal</journal-directory>
+ <paging-directory>/media/shared/data/paging</paging-directory>
</programlisting>
<para>
How these paths are configured will of course depend on your network settings or file system.
</para>
<para>
- Now we need to configure how remote JMS clients wull behave if the server is shutdown in a normal
+ Now we need to configure how remote JMS clients will behave if the server is shutdown in a normal
fashion.
By
default
@@ -97,7 +96,7 @@
file like so:
</para>
<programlisting>
- <failover-on-shutdown>false</failover-on-shutdown>
+ <failover-on-shutdown>false</failover-on-shutdown>
</programlisting>
<para>Don't worry if you have this set to false (which is the default) but still want failover to occur,
simply
@@ -124,44 +123,43 @@
directory but in reality it doesn't matter where this is put. This will look like:
</para>
<programlisting>
- <?xml version="1.0" encoding="UTF-8"?>
+ <?xml version="1.0" encoding="UTF-8"?>
- <deployment xmlns="urn:jboss:bean-deployer:2.0">
+ <deployment xmlns="urn:jboss:bean-deployer:2.0">
- <!-- The core configuration -->
- <bean name="BackupConfiguration" class="org.hornetq.core.config.impl.FileConfiguration">
- <property
- name="configurationUrl">${jboss.server.home.url}/deploy/hornetq-backup1/hornetq-configuration.xml</property>
- </bean>
+ <!-- The core configuration -->
+ <bean name="BackupConfiguration" class="org.hornetq.core.config.impl.FileConfiguration">
+ <property name="configurationUrl">${jboss.server.home.url}/deploy/hornetq-backup1/hornetq-configuration.xml</property>
+ </bean>
- <!-- The core server -->
- <bean name="BackupHornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
- <constructor>
- <parameter>
+ <!-- The core server -->
+ <bean name="BackupHornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
<inject bean="BackupConfiguration"/>
- </parameter>
- <parameter>
+ </parameter>
+ <parameter>
<inject bean="MBeanServer"/>
- </parameter>
- <parameter>
+ </parameter>
+ <parameter>
<inject bean="HornetQSecurityManager"/>
- </parameter>
- </constructor>
- <start ignored="true"/>
- <stop ignored="true"/>
- </bean>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
- <!-- The JMS server -->
- <bean name="BackupJMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
- <constructor>
- <parameter>
+ <!-- The JMS server -->
+ <bean name="BackupJMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
<inject bean="BackupHornetQServer"/>
- </parameter>
- </constructor>
- </bean>
+ </parameter>
+ </constructor>
+ </bean>
- </deployment>
+ </deployment>
</programlisting>
<para>
The first thing to notice is the BackupConfiguration bean. This is configured to pick up the
@@ -199,6 +197,8 @@
<clustered>true</clustered>
+ <backup>true</backup>
+
<shared-store>true</shared-store>
<allow-failback>true</allow-failback>
@@ -321,6 +321,10 @@
to avoid naming clashes with the live server
</para>
<para>
+ The first important part of the configuration is to make sure that this server starts as a backup server not
+ a live server, via the <literal>backup</literal> attribute.
+ </para>
+ <para>
After that we have the same cluster configuration as live, that is <literal>clustered</literal> is true and
<literal>shared-store</literal> is true. However you can see we have added a new configuration element
<literal>allow-failback</literal>. When this is set to true then this backup server will automatically stop
@@ -377,6 +381,11 @@
</section>
<section>
<title>Dedicated Live and Backup in Symmetrical cluster</title>
+ <para>
+ In reality the configuration for this is exactly the same as in the previous section, the only difference is
+ that a backup will reside on an eap instance of its own. of course this means that the eap instance is passive
+ and not used until the backup comes live and is only really useful for pure JMS applications.
+ </para>
</section>
</section>
</chapter>
\ No newline at end of file
14 years, 2 months
JBoss hornetq SVN: r10042 - in trunk/docs: eap-manual and 1 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-12-16 08:33:25 -0500 (Thu, 16 Dec 2010)
New Revision: 10042
Added:
trunk/docs/eap-manual/
trunk/docs/eap-manual/build.bat
trunk/docs/eap-manual/build.sh
trunk/docs/eap-manual/build.xml
trunk/docs/eap-manual/en/
trunk/docs/eap-manual/en/clusters.xml
trunk/docs/eap-manual/en/master.xml
Log:
added eap manual
Copied: trunk/docs/eap-manual/build.bat (from rev 10039, trunk/docs/user-manual/build.bat)
===================================================================
--- trunk/docs/eap-manual/build.bat (rev 0)
+++ trunk/docs/eap-manual/build.bat 2010-12-16 13:33:25 UTC (rev 10042)
@@ -0,0 +1,13 @@
+@echo off
+
+set "OVERRIDE_ANT_HOME=..\..\tools\ant"
+
+if exist "..\..\src\bin\build.bat" (
+ rem running from TRUNK
+ call ..\..\src\bin\build.bat %*
+) else (
+ rem running from the distro
+ call ..\..\bin\build.bat %*
+)
+
+set "OVERRIDE_ANT_HOME="
Copied: trunk/docs/eap-manual/build.sh (from rev 10039, trunk/docs/user-manual/build.sh)
===================================================================
--- trunk/docs/eap-manual/build.sh (rev 0)
+++ trunk/docs/eap-manual/build.sh 2010-12-16 13:33:25 UTC (rev 10042)
@@ -0,0 +1,15 @@
+#!/bin/sh
+
+OVERRIDE_ANT_HOME=../../tools/ant
+export OVERRIDE_ANT_HOME
+
+if [ -f "../../src/bin/build.sh" ]; then
+ # running from TRUNK
+ ../../src/bin/build.sh "$@"
+else
+ # running from the distro
+ ../../bin/build.sh "$@"
+fi
+
+
+
Copied: trunk/docs/eap-manual/build.xml (from rev 10039, trunk/docs/user-manual/build.xml)
===================================================================
--- trunk/docs/eap-manual/build.xml (rev 0)
+++ trunk/docs/eap-manual/build.xml 2010-12-16 13:33:25 UTC (rev 10042)
@@ -0,0 +1,30 @@
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+
+<project name="HornetQ User Manual" default="all" basedir=".">
+
+ <property name="build.dir" value="build"/>
+ <property name="pdf.name" value="HornetQ_EAPManual.pdf"/>
+ <import file="${basedir}/../../lib/docbook-support/support.xml"/>
+
+ <target name="all" depends="clean">
+ <mkdir dir="en/images" />
+ <antcall target="lang.all"><param name="lang" value="en"/></antcall>
+ </target>
+
+ <target name="html.doc" description="creates the html docs only and opens a browser">
+ <mkdir dir="en/images" />
+ <antcall target="lang.dochtml"><param name="lang" value="en"/></antcall>
+ </target>
+
+</project>
Copied: trunk/docs/eap-manual/en/clusters.xml (from rev 10039, trunk/docs/quickstart-guide/en/about.xml)
===================================================================
--- trunk/docs/eap-manual/en/clusters.xml (rev 0)
+++ trunk/docs/eap-manual/en/clusters.xml 2010-12-16 13:33:25 UTC (rev 10042)
@@ -0,0 +1,382 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+
+<chapter id="clusters">
+ <title>HornetQ and EAP Cluster Configuration</title>
+ <section>
+ <title>Configuring Failover</title>
+ <para>
+ This chapter explains how to configure HornetQ within EAP with live backup-groups. Currently in this version
+ HornetQ only supports shared store for backup nodes so we assume that in the rest of this chapter.
+ </para>
+ <para>There are 2 main ways to configure HornetQ servers to have a backup server:</para>
+ <itemizedlist>
+ <listitem>
+ <para>Colocated. This is when an EAP instance has both a live and backup(s) running.</para>
+ </listitem>
+ <listitem>
+ <para>Dedicated. This is when an EAP instance has either a live or backup running but never both.</para>
+ </listitem>
+ </itemizedlist>
+ <section>
+ <title>Colocated Live and Backup in Symmetrical cluster</title>
+ <para>
+ The colocated symmetrical topology will be the most widely used topology, this is where an EAP instance has
+ a
+ live
+ node running plus 1 or
+ more backup nodes. Each backup node will belong to a live node on another EAP instance. In a simple cluster
+ of
+ 2
+ EAP instances this would mean that each EAP instance would have a live server and 1 backup server as in
+ diagram1.
+ </para>
+ <para>todo add image</para>
+ <para>
+ With more thn 2 servers it is up to the user as to how many backups per live server are configured, you can
+ have
+ as many backups as required but usually 1 would suffice. In 3 node topology you may have each EAP instance
+ configured
+ with 2 backups, 1 for each of the other live servers, or you may just want to have 1 backup for each live.
+ </para>
+ <section>
+ <title>Configuration</title>
+ <para>
+ First lets start with the configuration of the live server, we will use the EAP 'all' configuration as
+ our
+ starting
+ point. Since this version only supports shared store for failover we need to configure this in the
+ <literal>hornetq-configuration.xml</literal>
+ file like so:
+ </para>
+ <programlisting>
+ <shared-store>true</shared-store>
+ </programlisting>
+ <para>
+ Obviously this means that the location of the journal files etc will have to be configured to be some
+ where
+ where
+ this lives backup can access. You may change the lives configuration in
+ <literal>hornetq-configuration.xml</literal>
+ to
+ something like:
+ </para>
+ <programlisting>
+ <large-messages-directory>/media/shared/data/large-messages</large-messages-directory>
+ <bindings-directory>/media/shared/data/bindings</bindings-directory>
+ <journal-directory>/media/shared/data/journal</journal-directory>
+ <paging-directory>/media/shared/data/paging</paging-directory>
+ </programlisting>
+ <para>
+ How these paths are configured will of course depend on your network settings or file system.
+ </para>
+ <para>
+ Now we need to configure how remote JMS clients wull behave if the server is shutdown in a normal
+ fashion.
+ By
+ default
+ Clients will not failover if the live server is shutdown. Depending on there connection factory settings
+ they will either fail or try to reconnect to the live server.
+ </para>
+ <para>If you want clients to failover on a normal server shutdown the you must configure the
+ <literal>failover-on-shutdown</literal>
+ flag to true in the
+ <literal>hornetq-configuration.xml</literal>
+ file like so:
+ </para>
+ <programlisting>
+ <failover-on-shutdown>false</failover-on-shutdown>
+ </programlisting>
+ <para>Don't worry if you have this set to false (which is the default) but still want failover to occur,
+ simply
+ kill
+ the
+ server process directly or call
+ <literal>forceFailover</literal>
+ via jmx or the admin console on the core server object.
+ </para>
+ <para>
+ No lets look at how to create and configure a backup server on the same node, lets assume that this
+ backups
+ live
+ server is configured identically to the live server on this node for simplicities sake.
+ </para>
+ <para>
+ Firstly we need to define a new HornetQ Server that EAP will deploy. We do this by creating a new
+ <literal>hornetq-jboss-beans.xml</literal>
+ configuration. We will place this under a new directory
+ <literal>hornetq-backup1</literal>
+ which will need creating
+ in the
+ <literal>deploy</literal>
+ directory but in reality it doesn't matter where this is put. This will look like:
+ </para>
+ <programlisting>
+ <?xml version="1.0" encoding="UTF-8"?>
+
+ <deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <!-- The core configuration -->
+ <bean name="BackupConfiguration" class="org.hornetq.core.config.impl.FileConfiguration">
+ <property
+ name="configurationUrl">${jboss.server.home.url}/deploy/hornetq-backup1/hornetq-configuration.xml</property>
+ </bean>
+
+
+ <!-- The core server -->
+ <bean name="BackupHornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="BackupConfiguration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="BackupJMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="BackupHornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+ </deployment>
+ </programlisting>
+ <para>
+ The first thing to notice is the BackupConfiguration bean. This is configured to pick up the
+ configuration
+ for
+ the
+ server which we will place in the same directory.
+ </para>
+ <para>
+ After that we just configure a new HornetQ Server and JMS server.
+ </para>
+ <note>
+ <para>
+ Notice that the names of the beans have been changed from that of the live servers configuration. This
+ is
+ so
+ there is no clash. Obviously if you add more backup servers you will need to rename those as well,
+ backup1,
+ backup2 etc.
+ </para>
+ </note>
+ <para>
+ Now lets add the server configuration in
+ <literal>hornetq-configuration.xml</literal>
+ and add it to the same directory
+ <literal>deploy/hornetq-backup1</literal>
+ and configure it like so:
+ </para>
+ <programlisting>
+ <configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+
+ <jmx-domain>org.hornetq.backup1</jmx-domain>
+
+ <clustered>true</clustered>
+
+ <shared-store>true</shared-store>
+
+ <allow-failback>true</allow-failback>
+
+ <log-delegate-factory-class-name>org.hornetq.integration.logging.Log4jLogDelegateFactory</log-delegate-factory-class-name>
+
+ <bindings-directory>${jboss.server.data.dir}/hornetq-backup/bindings</bindings-directory>
+
+ <journal-directory>${jboss.server.data.dir}/hornetq-backup/journal</journal-directory>
+
+ <journal-min-files>10</journal-min-files>
+
+ <large-messages-directory>${jboss.server.data.dir}/hornetq-backup/largemessages</large-messages-directory>
+
+ <paging-directory>${jboss.server.data.dir}/hornetq/paging</paging-directory>
+
+ <connectors>
+ <connector name="netty-connector">
+ <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5446}"/>
+ </connector>
+
+ <!--The connetor to the live node that corresponds to this backup-->
+ <connector name="my-live-connector">
+ <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
+ <param key="host" value="my-live-host"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5445}"/>
+ </connector>
+
+ <!--invm connector added by th elive server on this node, used by the bridges-->
+ <connector name="in-vm">
+ <factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
+ <param key="server-id" value="${hornetq.server-id:0}"/>
+ </connector>
+
+ </connectors>
+
+ <acceptors>
+ <acceptor name="netty">
+ <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
+ <param key="host" value="${jboss.bind.address:localhost}"/>
+ <param key="port" value="${hornetq.remoting.netty.port:5446}"/>
+ </acceptor>
+ </acceptors>
+
+ <broadcast-groups>
+ <broadcast-group name="bg-group1">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <broadcast-period>1000</broadcast-period>
+ <connector-ref>netty-connector</connector-ref>
+ </broadcast-group>
+ </broadcast-groups>
+
+ <discovery-groups>
+ <discovery-group name="dg-group1">
+ <group-address>231.7.7.7</group-address>
+ <group-port>9876</group-port>
+ <refresh-timeout>60000</refresh-timeout>
+ </discovery-group>
+ </discovery-groups>
+
+ <cluster-connections>
+ <cluster-connection name="my-cluster">
+ <address>jms</address>
+ <connector-ref>netty-connector</connector-ref>
+ <discovery-group-ref discovery-group-name="dg-group1"/>
+ </cluster-connection>
+ </cluster-connections>
+
+ <!-- We need to create a core queue for the JMS queue explicitly because the bridge will be deployed
+ before the JMS queue is deployed, so the first time, it otherwise won't find the queue -->
+ <queues>
+ <queue name="jms.queue.testQueue">
+ <address>jms.queue.testQueue</address>
+ </queue>
+ </queues>
+ <!-- We set-up a bridge that forwards from a the queue on this node to the same address on the live
+ node.
+ -->
+ <bridges>
+ <bridge name="testQueueBridge">
+ <queue-name>jms.queue.testQueue</queue-name>
+ <forwarding-address>jms.queue.testQueue</forwarding-address>
+ <reconnect-attempts>-1</reconnect-attempts>
+ <static-connectors>
+ <connector-ref>in-vm</connector-ref>
+ </static-connectors>
+ </bridge>
+ </bridges>
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>jms.queue.DLQ</dead-letter-address>
+ <expiry-address>jms.queue.ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <max-size-bytes>10485760</max-size-bytes>
+ <message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>BLOCK</address-full-policy>
+ </address-setting>
+ </address-settings>
+
+ </configuration>
+
+ </programlisting>
+ <para>
+ The first thing you can see is we have added a <literal>jmx-domain</literal> attribute, this is used when
+ adding objects, such as the HornetQ server and JMS server to jmx, we change this from the default <literal>org.hornetq</literal>
+ to avoid naming clashes with the live server
+ </para>
+ <para>
+ After that we have the same cluster configuration as live, that is <literal>clustered</literal> is true and
+ <literal>shared-store</literal> is true. However you can see we have added a new configuration element
+ <literal>allow-failback</literal>. When this is set to true then this backup server will automatically stop
+ and fall back into backup node if failover occurs and the live server has become available. If false then
+ the user will have to stop the server manually.
+ </para>
+ <para>
+ Next we can see the configuration for the journal location, as in the live configuration this must point to
+ the same directory as this backup's live server.
+ </para>
+ <para>
+ Now we see the connectors configuration, we have 3 defined which are needed for the following
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ <literal>netty-connector.</literal> This is the connector used to connect to this backup server once live.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>my-live-connector.</literal> This is the connector to the live server that this backup is paied to.
+ It is used by the cluster connection to announce its presence as a backup and to form the cluster when
+ this backup becomes live. In reality it doesn't matter what connector the cluster connection uses, it
+ could actually use the invm connector and broadcast its presence via the server on this node if we wanted.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>in-vm.</literal> This is the invm connector that is created by the live server on the same
+ node. We will use this to create a bridge to the live server to forward messages to.
+ </para>
+ </listitem>
+ </itemizedlist>
+ <para>After that you will see the acceptors defined, This is the acceptor where clients will reconnect.</para>
+ <para>
+ The Broadcast groups, Discovery group and cluster configurations are as per normal, details of these
+ can be found in the HornetQ user manual.
+ </para>
+ <para>
+ The next part is of interest, here we define a list of queues and bridges. These must match any queues
+ and addresses used by MDB's in the live servers configuration. At this point these must be statically
+ defined but this may change in future versions. Basically fow every queue or topic definition you need a
+ queue configuration using the correct prefix <literal>jms.queue(topic)</literal> if using jm and a bridge
+ definition that handles the forwarding of any message.
+ </para>
+ <note>
+ <para>
+ There is no such thing as a topic in core HornetQ, this is basically just an address so we need to create
+ a queue that matches the jms address, that is, <literal>jms.topic.testTopic</literal>.
+ </para>
+ </note>
+ </section>
+ </section>
+ <section>
+ <title>Dedicated Live and Backup in Symmetrical cluster</title>
+ </section>
+ </section>
+</chapter>
\ No newline at end of file
Copied: trunk/docs/eap-manual/en/master.xml (from rev 10039, trunk/docs/quickstart-guide/en/master.xml)
===================================================================
--- trunk/docs/eap-manual/en/master.xml (rev 0)
+++ trunk/docs/eap-manual/en/master.xml 2010-12-16 13:33:25 UTC (rev 10042)
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.3CR3//EN"
+ "../../../lib/docbook-support/support/docbook-dtd/docbookx.dtd" [
+ <!ENTITY clusters SYSTEM "clusters.xml">
+ ]>
+<book lang="en">
+ <bookinfo>
+ <title>HornetQ EAP Guide</title>
+ <subtitle>Putting the buzz in messaging</subtitle>
+ </bookinfo>
+
+ <toc></toc>
+
+ &clusters;
+</book>
14 years, 2 months