JBoss hornetq SVN: r9803 - branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-20 17:09:02 -0400 (Wed, 20 Oct 2010)
New Revision: 9803
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
Log:
tweak
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-20 20:50:34 UTC (rev 9802)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-20 21:09:02 UTC (rev 9803)
@@ -125,7 +125,7 @@
if (position.getMessageNr() > 0)
{
- cursorInfo.confirmed.addAndGet(position.getMessageNr() - 1);
+ cursorInfo.confirmed.addAndGet(position.getMessageNr());
}
ack(position);
13 years, 6 months
JBoss hornetq SVN: r9802 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-20 16:50:34 -0400 (Wed, 20 Oct 2010)
New Revision: 9802
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Adding bookmarking for starting a cursor on a later position
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-20 14:00:18 UTC (rev 9801)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-20 20:50:34 UTC (rev 9802)
@@ -32,6 +32,8 @@
// To be called before the server is down
void stop();
+ void bookmark(PagePosition position) throws Exception;
+
/** It will be 0 if non persistent cursor */
public long getId();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-20 14:00:18 UTC (rev 9801)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-20 20:50:34 UTC (rev 9802)
@@ -74,6 +74,8 @@
// to be used on tests -------------------------------------------
int getCacheSize();
+
+ void printDebug();
// Package protected ---------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-20 14:00:18 UTC (rev 9801)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-20 20:50:34 UTC (rev 9802)
@@ -39,7 +39,6 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
@@ -110,9 +109,28 @@
public PageCursorProvider getProvider()
{
- return this.cursorProvider;
+ return cursorProvider;
}
+ public void bookmark(PagePosition position) throws Exception
+ {
+ if (lastPosition != null)
+ {
+ throw new RuntimeException("Bookmark can only be done at the time of the cursor's creation");
+ }
+
+ lastPosition = position;
+
+ PageCursorInfo cursorInfo = getPageInfo(position);
+
+ if (position.getMessageNr() > 0)
+ {
+ cursorInfo.confirmed.addAndGet(position.getMessageNr() - 1);
+ }
+
+ ack(position);
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
*/
@@ -195,7 +213,6 @@
installTXCallback(tx, position);
}
-
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#getFirstPage()
@@ -212,7 +229,6 @@
}
}
-
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
*/
@@ -242,27 +258,24 @@
installTXCallback(tx, position);
}
-
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#positionIgnored(org.hornetq.core.paging.cursor.PagePosition)
*/
- public void positionIgnored(PagePosition position)
+ public void positionIgnored(final PagePosition position)
{
processACK(position);
}
-
-
+
/**
* All the data associated with the cursor should go away here
*/
- public void close() throws Exception
+ public void close() throws Exception
{
final long tx = store.generateUniqueID();
-
+
final ArrayList<Exception> ex = new ArrayList<Exception>();
-
+
final AtomicBoolean isPersistent = new AtomicBoolean(false);
-
// We can't delete the records at the caller's thread
// because an executor may be holding the synchronized on PageCursorImpl
@@ -295,24 +308,24 @@
catch (Exception e)
{
ex.add(e);
- log.warn(e.getMessage(), e);
+ PageCursorImpl.log.warn(e.getMessage(), e);
}
}
});
-
+
Future future = new Future();
-
+
executor.execute(future);
-
+
while (!future.await(5000))
{
- log.warn("Timeout on waiting cursor " + this + " to be closed");
+ PageCursorImpl.log.warn("Timeout on waiting cursor " + this + " to be closed");
}
-
-
+
if (isPersistent.get())
{
- // Another reason to perform the commit at the main thread is because the OperationContext may only send the result to the client when
+ // Another reason to perform the commit at the main thread is because the OperationContext may only send the
+ // result to the client when
// the IO on commit is done
if (ex.size() == 0)
{
@@ -324,10 +337,9 @@
throw ex.get(0);
}
}
-
+
cursorProvider.close(this);
}
-
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#getId()
@@ -337,8 +349,6 @@
return cursorId;
}
-
-
public void processReload() throws Exception
{
if (recoveredACK != null)
@@ -397,29 +407,29 @@
previousPos = pos;
}
- this.lastAckedPosition = lastPosition;
+ lastAckedPosition = lastPosition;
recoveredACK.clear();
recoveredACK = null;
}
}
-
+
public void stop()
{
Future future = new Future();
executor.execute(future);
while (!future.await(1000))
{
- log.warn("Waiting page cursor to finish executors - " + this);
+ PageCursorImpl.log.warn("Waiting page cursor to finish executors - " + this);
}
}
public void printDebug()
{
- printDebug(this.toString());
+ printDebug(toString());
}
-
- public void printDebug(String msg)
+
+ public void printDebug(final String msg)
{
System.out.println("Debug information on PageCurorImpl- " + msg);
for (PageCursorInfo info : consumedPages.values())
@@ -469,7 +479,7 @@
// there's a different page being acked, we will do the check right away
scheduleCleanupCheck();
}
- this.lastAckedPosition = pos;
+ lastAckedPosition = pos;
}
PageCursorInfo info = getPageInfo(pos);
@@ -550,7 +560,7 @@
{
if (entry.getKey() == lastAckedPosition.getPageNr())
{
- trace("We can't clear page " + entry.getKey() + " now since it's the current page");
+ PageCursorImpl.trace("We can't clear page " + entry.getKey() + " now since it's the current page");
}
else
{
@@ -588,7 +598,7 @@
{
executor.execute(new Runnable()
{
-
+
public void run()
{
synchronized (PageCursorImpl.this)
@@ -601,12 +611,13 @@
}
if (consumedPages.remove(completePage.getPageId()) == null)
{
- log.warn("Couldn't remove page " + completePage.getPageId() + " from consumed pages on cursor for address " + pageStore.getAddress());
+ PageCursorImpl.log.warn("Couldn't remove page " + completePage.getPageId() +
+ " from consumed pages on cursor for address " +
+ pageStore.getAddress());
}
- }
+ }
}
-
-
+
cursorProvider.scheduleCleanup();
}
});
@@ -638,11 +649,12 @@
// We're holding this object to avoid delete the pages before the IO is complete,
// however we can't delete these records again
private boolean pendingDelete;
-
+
// We need a separate counter as the cursor may be ignoring certain values because of incomplete transactions or
// expressions
private final AtomicInteger confirmed = new AtomicInteger(0);
+ @Override
public String toString()
{
return "PageCursorInfo::PageID=" + pageId + " numberOfMessage = " + numberOfMessages;
@@ -663,15 +675,15 @@
{
return getNumberOfMessages() == confirmed.get();
}
-
+
public boolean isPendingDelete()
{
return pendingDelete;
}
-
+
public void setPendingDelete()
{
- this.pendingDelete = true;
+ pendingDelete = true;
}
/**
@@ -699,9 +711,13 @@
pageId);
}
- if (getNumberOfMessages() == confirmed.incrementAndGet())
+ // Negative could mean a bookmark on the first element for the page (example -1)
+ if (posACK.getMessageNr() >= 0)
{
- onPageDone(this);
+ if (getNumberOfMessages() == confirmed.incrementAndGet())
+ {
+ onPageDone(this);
+ }
}
}
@@ -729,7 +745,7 @@
}
- static class PageCursorTX implements TransactionOperation
+ static class PageCursorTX extends TransactionOperationAbstract
{
HashMap<PageCursorImpl, List<PagePosition>> pendingPositions = new HashMap<PageCursorImpl, List<PagePosition>>();
@@ -747,29 +763,9 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
- */
- public void beforePrepare(final Transaction tx) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#afterPrepare(org.hornetq.core.transaction.Transaction)
- */
- public void afterPrepare(final Transaction tx)
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
- */
- public void beforeCommit(final Transaction tx) throws Exception
- {
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
*/
+ @Override
public void afterCommit(final Transaction tx)
{
for (Entry<PageCursorImpl, List<PagePosition>> entry : pendingPositions.entrySet())
@@ -786,19 +782,6 @@
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
- */
- public void beforeRollback(final Transaction tx) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
- */
- public void afterRollback(final Transaction tx)
- {
- }
}
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-20 14:00:18 UTC (rev 9801)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-20 20:50:34 UTC (rev 9802)
@@ -112,7 +112,7 @@
System.out.println("NumberOfPages = " + numberOfPages);
- PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(), server.getExecutorFactory());
+ PageCursorProviderImpl cursorProvider = (PageCursorProviderImpl)createNonPersistentCursor();
PageCursor cursor = cursorProvider.createNonPersistentCursor();
@@ -134,6 +134,16 @@
}
+ /**
+ * @return
+ * @throws Exception
+ */
+ private PageCursor createNonPersistentCursor() throws Exception
+ {
+ return lookupCursorProvider().createNonPersistentCursor();
+ }
+
+
public void testReadNextPage() throws Exception
{
@@ -143,12 +153,22 @@
System.out.println("NumberOfPages = " + numberOfPages);
- PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(), server.getExecutorFactory());
+ PageCursorProvider cursorProvider = lookupCursorProvider();
PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(2,0));
assertNull(cache);
}
+
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ private PageCursorProvider lookupCursorProvider() throws Exception
+ {
+ return lookupPageStore(ADDRESS).getCursorProvier();
+ }
public void testRestart() throws Exception
@@ -159,7 +179,7 @@
System.out.println("Number of pages = " + numberOfPages);
- PageCursorProviderImpl cursorProvider = (PageCursorProviderImpl)this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ PageCursorProvider cursorProvider = lookupCursorProvider();
PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
@@ -480,7 +500,7 @@
System.out.println("NumberOfPages = " + numberOfPages);
- PageCursorProvider cursorProvider = lookupPageStore(ADDRESS).getCursorProvier();
+ PageCursorProvider cursorProvider = lookupCursorProvider();
PageCursor cursor = cursorProvider.createNonPersistentCursor();
PageCursorImpl cursor2 = (PageCursorImpl)cursorProvider.createNonPersistentCursor();
@@ -527,7 +547,43 @@
public void testFirstMessageInTheMiddle() throws Exception
{
+
+ final int NUM_MESSAGES = 100;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider = lookupCursorProvider();
+ PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
+
+ PageCursor cursor = cursorProvider.createNonPersistentCursor();
+ PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages()/2);
+ cursor.bookmark(startingPos);
+ PagedMessage msg = cache.getMessage(startingPos.getMessageNr() + 1);
+ msg.initMessage(server.getStorageManager());
+ int key = msg.getMessage().getIntProperty("key").intValue();
+
+ msg = null;
+
+ cache = null;
+
+ Pair<PagePosition, PagedMessage> msgCursor = null;
+ while ((msgCursor = cursor.moveNext()) != null)
+ {
+ assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msgCursor.a);
+ }
+ assertEquals(NUM_MESSAGES, key);
+
+
+ forceGC();
+
+ assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+
+ server.stop();
+
}
private int addMessages(final int numMessages, final int messageSize) throws Exception
13 years, 6 months
JBoss hornetq SVN: r9801 - in branches/hornetq-416: src/main/org/hornetq/core/management/impl and 5 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-10-20 10:00:18 -0400 (Wed, 20 Oct 2010)
New Revision: 9801
Modified:
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java
branches/hornetq-416/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
branches/hornetq-416/src/main/org/hornetq/core/server/HornetQServer.java
branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java
branches/hornetq-416/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java
branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
Log:
more management operations
Modified: branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-10-20 11:26:32 UTC (rev 9800)
+++ branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-10-20 14:00:18 UTC (rev 9801)
@@ -251,4 +251,23 @@
*/
@Operation(desc = "List all JMS consumers associated to a JMS Connection")
String listConsumersAsJSON(@Parameter(desc = "a connection ID", name = "connectionID") String connectionID) throws Exception;
+
+ /**
+ * Lists all addresses to which the designated server session has sent messages.
+ */
+ @Operation(desc = "Lists all addresses to which the designated session has sent messages", impact = MBeanOperationInfo.INFO)
+ String[] listTargetDestinations(@Parameter(desc = "a session ID", name = "sessionID") String sessionID) throws Exception;
+
+ /**
+ * Returns the last sent message's ID from the given session to an address.
+ */
+ @Operation(desc = "Returns the last sent message's ID from the given session to an address", impact = MBeanOperationInfo.INFO)
+ String getLastSentMessageID(@Parameter(desc = "session name", name = "sessionID") String sessionID,
+ @Parameter(desc = "address", name = "address") String address) throws Exception;
+
+ /**
+ * Gets the session's creation time.
+ */
+ @Operation(desc = "Gets the sessions creation time", impact = MBeanOperationInfo.INFO)
+ String getSessionCreationTime(@Parameter(desc = "session name", name = "sessionID") String sessionID) throws Exception;
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2010-10-20 11:26:32 UTC (rev 9800)
+++ branches/hornetq-416/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2010-10-20 14:00:18 UTC (rev 9801)
@@ -1755,4 +1755,13 @@
}
}
+ public String[] listTargetAddresses(String sessionID)
+ {
+ ServerSession session = server.getSessionByID(sessionID);
+ if (session != null) {
+ return session.getTargetAddresses();
+ }
+ return new String[0];
+ }
+
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/HornetQServer.java 2010-10-20 11:26:32 UTC (rev 9800)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/HornetQServer.java 2010-10-20 14:00:18 UTC (rev 9801)
@@ -159,4 +159,6 @@
void deployBridge(BridgeConfiguration config) throws Exception;
void destroyBridge(String name) throws Exception;
+
+ ServerSession getSessionByID(String sessionID);
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java 2010-10-20 11:26:32 UTC (rev 9800)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java 2010-10-20 14:00:18 UTC (rev 9801)
@@ -117,4 +117,10 @@
void addMetaData(String key, String data);
String getMetaData(String key);
+
+ String[] getTargetAddresses();
+
+ String getLastSentMessageID(String address);
+
+ long getCreationTime();
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-20 11:26:32 UTC (rev 9800)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-20 14:00:18 UTC (rev 9801)
@@ -1513,6 +1513,11 @@
});
}
+
+ public ServerSession getSessionByID(String sessionName)
+ {
+ return sessions.get(sessionName);
+ }
// Inner classes
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-10-20 11:26:32 UTC (rev 9800)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-10-20 14:00:18 UTC (rev 9801)
@@ -19,6 +19,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -65,6 +66,7 @@
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.protocol.SessionCallback;
import org.hornetq.utils.TypedProperties;
+import org.hornetq.utils.UUID;
/*
* Session implementation
@@ -139,6 +141,10 @@
private Map<String, String> metaData;
+ private Map<SimpleString, UUID> targetAddressInfos = new HashMap<SimpleString, UUID>();
+
+ private long creationTime = System.currentTimeMillis();
+
// Constructors ---------------------------------------------------------------------------------
public ServerSessionImpl(final String name,
@@ -1181,6 +1187,8 @@
}
postOffice.route(msg, routingContext, direct);
+
+ targetAddressInfos.put(msg.getAddress(), msg.getUserID());
routingContext.clear();
}
@@ -1203,5 +1211,29 @@
}
return data;
}
+
+ public String[] getTargetAddresses()
+ {
+ Map<SimpleString, UUID> copy = new HashMap<SimpleString, UUID>(targetAddressInfos);
+ Iterator<SimpleString> iter = copy.keySet().iterator();
+ int num = copy.keySet().size();
+ String[] addresses = new String[num];
+ int i = 0;
+ while (iter.hasNext())
+ {
+ addresses[i] = iter.next().toString();
+ i++;
+ }
+ return addresses;
+ }
+ public String getLastSentMessageID(String address)
+ {
+ return targetAddressInfos.get(SimpleString.toSimpleString(address)).toString();
+ }
+
+ public long getCreationTime()
+ {
+ return this.creationTime;
+ }
}
Modified: branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-10-20 11:26:32 UTC (rev 9800)
+++ branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-10-20 14:00:18 UTC (rev 9801)
@@ -586,6 +586,7 @@
private void addSessionMetaData(ClientSession session) throws HornetQException
{
+ session.addMetaData("jms-session", "");
if (clientID != null)
{
session.addMetaData("jms-client-id", clientID);
Modified: branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-10-20 11:26:32 UTC (rev 9800)
+++ branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-10-20 14:00:18 UTC (rev 9801)
@@ -34,6 +34,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.jms.management.ConnectionFactoryControl;
+import org.hornetq.api.jms.management.DestinationControl;
import org.hornetq.api.jms.management.JMSQueueControl;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.api.jms.management.TopicControl;
@@ -43,7 +44,6 @@
import org.hornetq.core.server.ServerSession;
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQQueue;
-import org.hornetq.jms.client.HornetQTopic;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.utils.json.JSONArray;
@@ -744,7 +744,7 @@
for (ServerSession session : sessions)
{
- if (session.getMetaData("jms-client-id") != null)
+ if (session.getMetaData("jms-session") != null)
{
jmsSessions.put(session.getConnectionID(), session);
}
@@ -936,4 +936,55 @@
}
return list;
}
+
+ public String[] listTargetDestinations(String sessionID) throws Exception
+ {
+ String[] addresses = server.getHornetQServer().getHornetQServerControl().listTargetAddresses(sessionID);
+ Map<String, DestinationControl> allDests = new HashMap<String, DestinationControl>();
+
+ Object[] queueControls = server.getHornetQServer().getManagementService().getResources(JMSQueueControl.class);
+ for (int i = 0; i < queueControls.length; i++)
+ {
+ JMSQueueControl queueControl = (JMSQueueControl)queueControls[i];
+ allDests.put(queueControl.getAddress(), queueControl);
+ }
+
+ Object[] topicControls = server.getHornetQServer().getManagementService().getResources(TopicControl.class);
+ for (int i = 0; i < topicControls.length; i++)
+ {
+ TopicControl topicControl = (TopicControl)topicControls[i];
+ allDests.put(topicControl.getAddress(), topicControl);
+ }
+
+ List<String> destinations = new ArrayList<String>();
+ for (int i = 0; i < addresses.length; i++)
+ {
+ DestinationControl control = allDests.get(addresses[i]);
+ if (control != null)
+ {
+ destinations.add(control.getAddress());
+ }
+ }
+ return destinations.toArray(new String[0]);
+ }
+
+ public String getLastSentMessageID(String sessionID, String address) throws Exception
+ {
+ ServerSession session = server.getHornetQServer().getSessionByID(sessionID);
+ if (session != null)
+ {
+ return session.getLastSentMessageID(address);
+ }
+ return null;
+ }
+
+ public String getSessionCreationTime(String sessionID) throws Exception
+ {
+ ServerSession session = server.getHornetQServer().getSessionByID(sessionID);
+ if (session != null)
+ {
+ return String.valueOf(session.getCreationTime());
+ }
+ return null;
+ }
}
Modified: branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
--- branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-10-20 11:26:32 UTC (rev 9800)
+++ branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-10-20 14:00:18 UTC (rev 9801)
@@ -21,14 +21,12 @@
import javax.jms.Session;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.management.Parameter;
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.security.Role;
import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQQueue;
/**
@@ -285,6 +283,21 @@
return (Boolean)proxy.invokeOperation("createTopic", name, jndiBinding);
}
+ public String[] listTargetDestinations(String sessionID) throws Exception
+ {
+ return null;
+ }
+
+ public String getLastSentMessageID(String sessionID, String address) throws Exception
+ {
+ return null;
+ }
+
+ public String getSessionCreationTime(String sessionID) throws Exception
+ {
+ return null;
+ }
+
};
}
// Public --------------------------------------------------------
13 years, 6 months
JBoss hornetq SVN: r9800 - trunk/src/main/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-10-20 07:26:32 -0400 (Wed, 20 Oct 2010)
New Revision: 9800
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
Log:
better logging for invalid frames
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java 2010-10-20 09:04:53 UTC (rev 9799)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java 2010-10-20 11:26:32 UTC (rev 9800)
@@ -549,15 +549,13 @@
for (int i = 0; i < data; i++)
{
char b = (char)bytes[i];
-
- if (b == '\n')
+
+ if (b < 33 || b > 136)
{
- str.append("\\n");
+ //Unreadable characters
+
+ str.append(bytes[i]);
}
- else if (b == 0)
- {
- str.append("NUL");
- }
else
{
str.append(b);
13 years, 6 months
JBoss hornetq SVN: r9799 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-20 05:04:53 -0400 (Wed, 20 Oct 2010)
New Revision: 9799
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
Log:
fixed test
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2010-10-20 08:17:20 UTC (rev 9798)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2010-10-20 09:04:53 UTC (rev 9799)
@@ -72,9 +72,9 @@
waitForBindings(1, "queues.testaddress", 1, 1, true);
waitForBindings(2, "queues.testaddress", 1, 1, true);
- /*waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 2, false);*/
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
@@ -171,9 +171,9 @@
waitForBindings(1, "queues.testaddress", 1, 1, true);
waitForBindings(2, "queues.testaddress", 1, 1, true);
- /*waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 2, false);*/
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
try
{
13 years, 6 months
JBoss hornetq SVN: r9798 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/server and 10 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-20 04:17:20 -0400 (Wed, 20 Oct 2010)
New Revision: 9798
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/NodeManagerAction.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/NodeManagerTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/RealNodeManagerTest.java
Removed:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/LockFileImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/LockFileImplTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/HornetQServer.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/TopicClusterTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
updated locking mechanism and updated tests
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -1231,7 +1231,7 @@
return pairs.get(live);
}
- class StaticConnector
+ class StaticConnector implements Serializable
{
private List<Connector> connectors;
@@ -1272,6 +1272,10 @@
log.debug("unable to connect with static connector " + connectors.get(i).initialConnector);
}
}
+ if (csf == null)
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
+ }
}
catch (InterruptedException e)
{
@@ -1325,7 +1329,7 @@
factory = getFactory();
try
{
- factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+ factory.connect(reconnectAttempts, failoverOnInitialConnection);
}
catch (HornetQException e)
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/HornetQServer.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/HornetQServer.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -143,4 +143,6 @@
ReplicationManager getReplicationManager();
boolean checkActivate() throws Exception;
+
+ void kill() throws Exception;
}
Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.utils.UUID;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
+ * Date: Oct 13, 2010
+ * Time: 2:38:40 PM
+ */
+public abstract class NodeManager implements HornetQComponent
+{
+ public abstract void awaitLiveNode() throws Exception;
+
+ public abstract void startBackup() throws Exception;
+
+ public abstract void startLiveNode() throws Exception;
+
+ public abstract void pauseLiveServer() throws Exception;
+
+ public abstract void crashLiveServer() throws Exception;
+
+ public abstract void stopBackup() throws Exception;
+
+ private boolean isStarted = false;
+
+ protected volatile SimpleString nodeID;
+
+ protected volatile UUID uuid;
+
+ public void start() throws Exception
+ {
+ isStarted = true;
+ }
+
+ public void stop() throws Exception
+ {
+ isStarted = false;
+ }
+
+ public boolean isStarted()
+ {
+ return isStarted;
+ }
+
+
+ public SimpleString getNodeId()
+ {
+ return nodeID;
+ }
+
+ public UUID getUUID()
+ {
+ return uuid;
+ }
+}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -218,6 +218,7 @@
}
});
locator.setNodeID(nodeUUID.toString());
+ locator.setReconnectAttempts(-1);
backupSessionFactory = locator.connect();
backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), nodeUUID.toString(), true, configuration.getConnectorConfigurations().get(connectorConfiguration.getConnector())));
}
@@ -743,6 +744,7 @@
serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tcConfigs);
serverLocator.setNodeID(nodeUUID.toString());
+ serverLocator.setReconnectAttempts(-1);
}
else if (config.getDiscoveryGroupName() != null)
{
@@ -757,6 +759,7 @@
serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(dg.getGroupAddress(), dg.getGroupPort());
serverLocator.setNodeID(nodeUUID.toString());
+ serverLocator.setReconnectAttempts(-1);
}
else
{
Deleted: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -1,152 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.server.cluster.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.hornetq.core.server.cluster.LockFile;
-
-/**
- * A FakeLockFile
- *
- * A VM-wide exclusive lock on a file.
- *
- * Advisory only.
- *
- * Used for testing.
- *
- * @author Tim Fox
- *
- *
- */
-public class FakeLockFile implements LockFile
-{
- private final String fileName;
-
- private final String directory;
-
- private final static Map<String, Semaphore> locks = new HashMap<String, Semaphore>();
-
- private Semaphore semaphore;
- /**
- * @param fileName
- * @param directory
- */
- public FakeLockFile(final String fileName, final String directory)
- {
- this.fileName = fileName;
-
- this.directory = directory;
-
- synchronized (locks)
- {
- String key = directory + "/" + fileName;
-
- semaphore = locks.get(key);
-
- if (semaphore == null)
- {
- semaphore = new Semaphore(1, true);
-
- locks.put(key, semaphore);
-
- File f = new File(directory, fileName);
-
- try
- {
- f.createNewFile();
- }
- catch (IOException e)
- {
- e.printStackTrace();
- throw new IllegalStateException(e);
- }
-
- if(!f.exists())
- {
- throw new IllegalStateException("unable to create " + directory + fileName);
- }
- }
- }
- }
-
- public String getFileName()
- {
- return fileName;
- }
-
- public String getDirectory()
- {
- return directory;
- }
-
- public void lock() throws IOException
- {
- try
- {
- semaphore.acquire();
- }
- catch (InterruptedException e)
- {
- throw new IOException(e);
- }
- }
-
- public synchronized boolean unlock() throws IOException
- {
- semaphore.drainPermits();
- semaphore.release();
- return true;
- }
-
- public static void unlock(final String fileName, final String directory)
- {
- String key = directory + "/" + fileName;
-
- Semaphore semaphore = locks.get(key);
-
- semaphore.release();
- }
-
- public static void clearLocks()
- {
- for (Semaphore semaphore : locks.values())
- {
- semaphore.drainPermits();
- }
- locks.clear();
- }
-
- public static void clearLocks(String dir)
- {
- List<String> toClear = new ArrayList<String>();
- for (Map.Entry<String, Semaphore> e : locks.entrySet())
- {
- if(e.getKey().startsWith(dir))
- {
- e.getValue().drainPermits();
- toClear.add(e.getKey());
- }
- }
- for (String s : toClear)
- {
- locks.remove(s);
- }
- }
-}
Deleted: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/LockFileImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/LockFileImpl.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/LockFileImpl.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -1,159 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.server.cluster.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.cluster.LockFile;
-
-/**
- * A FailoverLockFileImpl
- *
- * The lock is per VM!
- *
- * Won't work well with NFS or GFS
- *
- * @author Tim Fox
- *
- */
-public class LockFileImpl implements LockFile
-{
- private static final Logger log = Logger.getLogger(LockFileImpl.class);
-
- private final String fileName;
-
- private final String directory;
-
- private RandomAccessFile raFile;
-
- private FileLock lock;
-
- /*
- * This method is "mainly" for testing (apologies for pun)
- */
- public static final void main(String[] args)
- {
- LockFileImpl lock = new LockFileImpl(args[0], args[1]);
-
- long time = Long.parseLong(args[2]);
-
- try
- {
- lock.lock();
- }
- catch (IOException e)
- {
- log.error("Failed to get lock", e);
- }
-
- log.info("Sleeping for " + time + " ms");
-
- try
- {
- Thread.sleep(time);
- }
- catch (InterruptedException e)
- {
- }
-
- try
- {
- lock.unlock();
- }
- catch (IOException e)
- {
- log.error("Failed to unlock", e);
- }
- }
-
- /**
- * @param fileName
- * @param directory
- */
- public LockFileImpl(final String fileName, final String directory)
- {
- this.fileName = fileName;
-
- this.directory = directory;
- }
-
- public String getFileName()
- {
- return fileName;
- }
-
- public String getDirectory()
- {
- return directory;
- }
-
- private final Object lockLock = new Object();
-
- private final Object unlockLock = new Object();
-
- public void lock() throws IOException
- {
- synchronized (lockLock)
- {
- File file = new File(directory, fileName);
-
- log.info("Trying to create " + file.getCanonicalPath());
-
- if (!file.exists())
- {
- file.createNewFile();
- }
-
- raFile = new RandomAccessFile(file, "rw");
-
- FileChannel channel = raFile.getChannel();
-
- // Try and obtain exclusive lock
- log.info("Trying to obtain exclusive lock on " + fileName);
-
- lock = channel.lock();
-
- log.info("obtained lock");
- }
- }
-
- public boolean unlock() throws IOException
- {
- synchronized (unlockLock)
- {
- if (lock == null)
- {
- return false;
- }
-
- lock.release();
-
- lock = null;
-
- raFile.close();
-
- raFile = null;
-
- log.info("Released lock on " + fileName);
-
- return true;
- }
- }
-
-}
Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.impl;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.utils.UUID;
+import org.hornetq.utils.UUIDGenerator;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
+ * Date: Oct 13, 2010
+ * Time: 2:44:02 PM
+ */
+public class FileLockNodeManager extends NodeManager
+{
+ private static final Logger log = Logger.getLogger(FileLockNodeManager.class);
+
+ private final String SERVER_LOCK_NAME = "server.lock";
+
+ private static final byte LIVE = 'L';
+
+ private static final byte FAILINGBACK = 'F';
+
+ private static final byte PAUSED = 'P';
+
+ private static final byte NOT_STARTED = 'N';
+
+ private FileChannel channel;
+
+ private FileLock liveLock;
+
+ private FileLock backupLock;
+
+ private final String directory;
+
+ public FileLockNodeManager(final String directory)
+ {
+ this.directory = directory;
+ }
+
+ public void start() throws Exception
+ {
+ if(isStarted())
+ {
+ return;
+ }
+ File file = new File(directory, SERVER_LOCK_NAME);
+
+ if (!file.exists())
+ {
+ file.createNewFile();
+ }
+
+ RandomAccessFile raFile = new RandomAccessFile(file, "rw");
+
+ channel = raFile.getChannel();
+
+ createNodeId();
+
+ super.start();
+ }
+
+ public void stop() throws Exception
+ {
+ channel.close();
+
+ super.stop();
+ }
+
+
+ public void awaitLiveNode() throws Exception
+ {
+ do
+ {
+ while (getState() == NOT_STARTED)
+ {
+ Thread.sleep(2000);
+ }
+
+ liveLock = channel.lock(1, 1, false);
+
+ byte state = getState();
+
+ if (state == PAUSED)
+ {
+ liveLock.release();
+ Thread.sleep(2000);
+ }
+ else if (state == FAILINGBACK)
+ {
+ liveLock.release();
+ Thread.sleep(2000);
+ }
+ else if (state == LIVE)
+ {
+ releaseBackupLock();
+
+ break;
+ }
+ }
+ while (true);
+ }
+
+ public void startBackup() throws Exception
+ {
+
+ log.info("Waiting to become backup node");
+
+ backupLock = channel.lock(2, 1, false);
+
+ log.info("** got backup lock");
+
+ readNodeId();
+ }
+
+ public void startLiveNode() throws Exception
+ {
+ setFailingBack();
+
+ log.info("Waiting to obtain live lock");
+
+ liveLock = channel.lock(1, 1, false);
+
+ log.info("Live Server Obtained live lock");
+
+ setLive();
+ }
+
+ public void pauseLiveServer() throws Exception
+ {
+ setPaused();
+ liveLock.release();
+ }
+
+ public void crashLiveServer() throws Exception
+ {
+ //overkill as already set to live
+ setLive();
+ liveLock.release();
+ }
+
+ public void stopBackup() throws Exception
+ {
+ backupLock.release();
+ }
+
+ private void setLive() throws Exception
+ {
+ ByteBuffer bb = ByteBuffer.allocateDirect(1);
+ bb.put(LIVE);
+ bb.position(0);
+ channel.write(bb, 0);
+ channel.force(true);
+ }
+
+ private void setFailingBack() throws Exception
+ {
+ ByteBuffer bb = ByteBuffer.allocateDirect(1);
+ bb.put(FAILINGBACK);
+ bb.position(0);
+ channel.write(bb, 0);
+ channel.force(true);
+ }
+
+ private void setPaused() throws Exception
+ {
+ ByteBuffer bb = ByteBuffer.allocateDirect(1);
+ bb.put(PAUSED);
+ bb.position(0);
+ channel.write(bb, 0);
+ channel.force(true);
+ }
+
+ private byte getState() throws Exception
+ {
+ ByteBuffer bb = ByteBuffer.allocateDirect(1);
+ int read;
+ read = channel.read(bb, 0);
+ if (read <= 0)
+ {
+ return NOT_STARTED;
+ }
+ else
+ return bb.get(0);
+ }
+
+ private void releaseBackupLock() throws Exception
+ {
+ if (backupLock != null)
+ {
+ backupLock.release();
+ }
+ }
+
+ private void createNodeId() throws Exception
+ {
+ ByteBuffer id = ByteBuffer.allocateDirect(16);
+ int read = channel.read(id, 3);
+ if(read != 16)
+ {
+ uuid = UUIDGenerator.getInstance().generateUUID();
+ nodeID = new SimpleString(uuid.toString());
+ id.put(uuid.asBytes(), 0, 16);
+ id.position(0);
+ channel.write(id, 3);
+ channel.force(true);
+ }
+ else
+ {
+ byte[] bytes = new byte[16];
+ id.position(0);
+ id.get(bytes);
+ uuid = new UUID(UUID.TYPE_TIME_BASED, bytes);
+ nodeID = new SimpleString(uuid.toString());
+ }
+ }
+
+ private void readNodeId() throws Exception
+ {
+ ByteBuffer id = ByteBuffer.allocateDirect(16);
+ int read = channel.read(id, 3);
+ if(read != 16)
+ {
+ throw new IllegalStateException("live server did not write id to file");
+ }
+ else
+ {
+ byte[] bytes = new byte[16];
+ id.position(0);
+ id.get(bytes);
+ uuid = new UUID(UUID.TYPE_TIME_BASED, bytes);
+ nodeID = new SimpleString(uuid.toString());
+ }
+ }
+}
+
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -13,13 +13,7 @@
package org.hornetq.core.server.impl;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.security.AccessController;
import java.security.PrivilegedAction;
@@ -44,7 +38,6 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.config.BackupConnectorConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.config.DivertConfiguration;
@@ -87,18 +80,10 @@
import org.hornetq.core.security.Role;
import org.hornetq.core.security.SecurityStore;
import org.hornetq.core.security.impl.SecurityStoreImpl;
-import org.hornetq.core.server.ActivateCallback;
-import org.hornetq.core.server.Divert;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.MemoryManager;
-import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.QueueFactory;
-import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.*;
import org.hornetq.core.server.cluster.ClusterManager;
-import org.hornetq.core.server.cluster.LockFile;
import org.hornetq.core.server.cluster.Transformer;
import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
-import org.hornetq.core.server.cluster.impl.LockFileImpl;
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
@@ -120,8 +105,6 @@
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.OrderedExecutorFactory;
import org.hornetq.utils.SecurityFormatter;
-import org.hornetq.utils.UUID;
-import org.hornetq.utils.UUIDGenerator;
import org.hornetq.utils.VersionLoader;
/**
@@ -144,9 +127,6 @@
// Attributes
// -----------------------------------------------------------------------------------
- private volatile SimpleString nodeID;
-
- private volatile UUID uuid;
private final Version version;
@@ -215,6 +195,7 @@
private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
private volatile GroupingHandler groupingHandler;
+ private NodeManager nodeManager;
// Constructors
// ---------------------------------------------------------------------------------
@@ -279,69 +260,23 @@
private interface Activation extends Runnable
{
- void close() throws Exception;
+ void close(boolean permanently) throws Exception;
}
/*
* Can be overridden for tests
*/
- protected LockFile createLockFile(final String fileName, final String directory)
+ protected NodeManager createNodeManager(final String directory)
{
- return new LockFileImpl(fileName, directory);
+ return new FileLockNodeManager(directory);
}
private class NoSharedStoreLiveActivation implements Activation
{
- LockFile liveLock;
-
public void run()
{
try
{
- checkJournalDirectory();
-
- // We now load the node id file, creating it, if it doesn't exist yet
- File nodeIDFile = new File(configuration.getJournalDirectory(), "node.id");
-
- if (!nodeIDFile.exists())
- {
- // We use another file lock to prevent a backup reading it before it is complete
-
- LockFile nodeIDLockFile = createLockFile("nodeid.lock", configuration.getJournalDirectory());
-
- nodeIDLockFile.lock();
-
- OutputStream os = null;
-
- try
- {
- os = new BufferedOutputStream(new FileOutputStream(nodeIDFile));
-
- uuid = UUIDGenerator.getInstance().generateUUID();
-
- nodeID = new SimpleString(uuid.toString());
-
- os.write(uuid.asBytes());
-
- log.info("Wrote node id, it is " + nodeID);
- }
- finally
- {
- if (os != null)
- {
- os.close();
- }
- }
-
- nodeIDLockFile.unlock();
- }
- else
- {
- // Read it
-
- readNodeID(nodeIDFile);
- }
-
initialisePart1();
initialisePart2();
@@ -354,27 +289,14 @@
}
}
- public void close() throws Exception
+ public void close(boolean permanently) throws Exception
{
- if (liveLock != null)
- {
- // We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
- // started before the live
- File liveFile = new File(configuration.getJournalDirectory(), "live.lock");
-
- liveFile.delete();
-
- liveLock.unlock();
-
- }
}
}
private class SharedStoreLiveActivation implements Activation
{
- LockFile liveLock;
-
public void run()
{
try
@@ -383,54 +305,8 @@
checkJournalDirectory();
- liveLock = createLockFile("live.lock", configuration.getJournalDirectory());
+ nodeManager.startLiveNode();
- liveLock.lock();
-
- log.info("Live Server Obtained live lock");
-
- // We now load the node id file, creating it, if it doesn't exist yet
- File nodeIDFile = new File(configuration.getJournalDirectory(), "node.id");
-
- if (!nodeIDFile.exists())
- {
- // We use another file lock to prevent a backup reading it before it is complete
-
- LockFile nodeIDLockFile = createLockFile("nodeid.lock", configuration.getJournalDirectory());
-
- nodeIDLockFile.lock();
-
- OutputStream os = null;
-
- try
- {
- os = new BufferedOutputStream(new FileOutputStream(nodeIDFile));
-
- uuid = UUIDGenerator.getInstance().generateUUID();
-
- nodeID = new SimpleString(uuid.toString());
-
- os.write(uuid.asBytes());
-
- log.info("Wrote node id, it is " + nodeID);
- }
- finally
- {
- if (os != null)
- {
- os.close();
- }
- }
-
- nodeIDLockFile.unlock();
- }
- else
- {
- // Read it
-
- readNodeID(nodeIDFile);
- }
-
initialisePart1();
initialisePart2();
@@ -443,163 +319,37 @@
}
}
- public void close() throws Exception
+ public void close(boolean permanently) throws Exception
{
- if (liveLock != null)
+ if(permanently)
{
- // We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
- // started before the live
- log.info("Live Server about to delete Live Lock file");
- File liveFile = new File(configuration.getJournalDirectory(), "live.lock");
- log.info("Live Server deleting Live Lock file");
- liveFile.delete();
-
- liveLock.unlock();
- log.info("Live server unlocking live lock");
-
+ nodeManager.crashLiveServer();
}
- }
- }
-
- private void readNodeID(final File nodeIDFile) throws Exception
- {
- // Read it
- InputStream is = null;
-
- try
- {
- is = new BufferedInputStream(new FileInputStream(nodeIDFile));
-
- byte[] bytes = new byte[16];
-
- int read = 0;
-
- while (read < 16)
+ else
{
- int r = is.read(bytes, read, 16 - read);
-
- if (r <= 0)
- {
- throw new IllegalStateException("Cannot read node id file, perhaps it is corrupt?");
- }
-
- read += r;
+ nodeManager.pauseLiveServer();
}
-
- uuid = new UUID(UUID.TYPE_TIME_BASED, bytes);
-
- nodeID = new SimpleString(uuid.toString());
-
- log.info("Read node id, it is " + nodeID);
}
- finally
- {
- if (is != null)
- {
- is.close();
- }
- }
}
+
private class SharedStoreBackupActivation implements Activation
{
- LockFile backupLock;
-
- LockFile liveLock;
-
public void run()
{
try
{
- checkJournalDirectory();
+ nodeManager.startBackup();
- backupLock = createLockFile("backup.lock", configuration.getJournalDirectory());
-
- log.info("Waiting to become backup node");
-
- backupLock.lock();
-
- log.info("** got backup lock");
-
- // We load the node id from the file in the journal dir - if the backup is started before live and live has
- // never been started before it may not exist yet, so
- // we wait for it
-
- File nodeIDFile = new File(configuration.getJournalDirectory(), "node.id");
-
- while (true)
- {
- // We also need to create another lock file for the node.id file since we don't want to see any partially
- // written
- // node id if the live node is still creating it.
- // Also renaming is not atomic necessarily so we can't use a write and rename strategy safely
-
- LockFile nodeIDLockFile = createLockFile("nodeid.lock", configuration.getJournalDirectory());
-
- nodeIDLockFile.lock();
- log.info("Backup server waiting for node id file creation");
- if (!nodeIDFile.exists())
- {
- nodeIDLockFile.unlock();
-
- Thread.sleep(2000);
- log.info("Backup server still waiting for node id file creation");
- continue;
- }
- log.info("Backup server waited for node id file creation");
- nodeIDLockFile.unlock();
-
- break;
- }
-
- readNodeID(nodeIDFile);
-
- log.info("Read node id " + nodeID);
-
initialisePart1();
-
- //TODO TODO at this point the clustermanager needs to announce it's presence so the cluster can know about the backup
- // We now look for the live.lock file - if it doesn't exist it means the live isn't started yet, so we wait
- // for that
- while (true)
- {
- File liveLockFile = new File(configuration.getJournalDirectory(), "live.lock");
- log.info("Backup server waiting for live lock file creation");
- while (!liveLockFile.exists())
- {
- log.debug("Waiting for server live lock file. Live server is not started");
+ clusterManager.start();
- Thread.sleep(2000);
- }
- log.info("Backup server waited for live lock file creation");
+ started = true;
- liveLock = createLockFile("live.lock", configuration.getJournalDirectory());
+ log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "] started");
-
- clusterManager.start();
-
- started = true;
-
- log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeID + "] started");
-
- liveLock.lock();
-
- // We need to test if the file exists again, since the live might have shutdown
- if (!liveLockFile.exists())
- {
- liveLock.unlock();
-
- continue;
- }
-
- log.info("Backup server obtained live lock");
-
- // Announce presence of live node to cluster
-
-
- break;
- }
+ nodeManager.awaitLiveNode();
configuration.setBackup(false);
@@ -608,8 +358,6 @@
clusterManager.activate();
log.info("Backup Server is now live");
-
- backupLock.unlock();
}
catch (InterruptedException e)
{
@@ -622,9 +370,13 @@
log.error("Failure in initialisation", e);
}
}
+ catch(Throwable e)
+ {
+ log.error("Failure in initialisation", e);
+ }
}
- public void close() throws Exception
+ public void close(boolean permanently) throws Exception
{
if (configuration.isBackup())
{
@@ -644,28 +396,21 @@
log.warn("Timed out waiting for backup activation to exit");
}
- if (liveLock != null)
- {
- liveLock.unlock();
- }
-
- if (backupLock != null)
- {
- backupLock.unlock();
- }
+ nodeManager.stopBackup();
}
else
{
//if we are now live, behave as live
// We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
// started before the live
- log.info("Live Server about to delete Live Lock file");
- File liveFile = new File(configuration.getJournalDirectory(), "live.lock");
- log.info("Live Server deleting Live Lock file");
- liveFile.delete();
-
- liveLock.unlock();
- log.info("Live server unlocking live lock");
+ if(permanently)
+ {
+ nodeManager.crashLiveServer();
+ }
+ else
+ {
+ nodeManager.pauseLiveServer();
+ }
}
}
}
@@ -688,7 +433,7 @@
}
}
- public void close() throws Exception
+ public void close(boolean permanently) throws Exception
{
}
}
@@ -701,6 +446,12 @@
{
initialiseLogging();
+ checkJournalDirectory();
+
+ nodeManager = createNodeManager(configuration.getJournalDirectory());
+
+ nodeManager.start();
+
if (started)
{
HornetQServerImpl.log.info((configuration.isBackup() ? "backup" : "live") + " is already started, ignoring the call to start..");
@@ -734,7 +485,7 @@
}
started = true;
- HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + nodeID + "] started");
+ HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "] started");
}
@@ -771,8 +522,18 @@
super.finalize();
}
+ public void kill() throws Exception
+ {
+ stop(true);
+ }
+
public void stop() throws Exception
{
+ stop(false);
+ }
+
+ public void stop(boolean permanently) throws Exception
+ {
System.out.println("*** stop called on server");
System.out.flush();
@@ -901,12 +662,11 @@
started = false;
initialised = false;
// to display in the log message
- SimpleString tempNodeID = nodeID;
- nodeID = null;
+ SimpleString tempNodeID = getNodeID();
if (activation != null)
{
- activation.close();
+ activation.close(permanently);
}
if (backupActivationThread != null)
@@ -914,6 +674,10 @@
backupActivationThread.join();
}
+ nodeManager.stop();
+
+ nodeManager = null;
+
HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + tempNodeID + "] stopped");
Logger.reset();
@@ -1119,7 +883,7 @@
public SimpleString getNodeID()
{
- return nodeID;
+ return nodeManager == null?null:nodeManager.getNodeId();
}
public Queue createQueue(final SimpleString address,
@@ -1524,7 +1288,7 @@
scheduledPool,
managementService,
configuration,
- uuid,
+ nodeManager.getUUID(),
configuration.isBackup(),
configuration.isClustered());
@@ -1659,7 +1423,7 @@
true,
false);
- Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeID);
+ Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId());
queues.put(queueBindingInfo.getId(), queue);
@@ -1759,7 +1523,7 @@
durable,
temporary);
- binding = new LocalQueueBinding(address, queue, nodeID);
+ binding = new LocalQueueBinding(address, queue, nodeManager.getNodeId());
if (durable)
{
Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.impl;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.utils.UUIDGenerator;
+
+import java.util.concurrent.Semaphore;
+
+import static org.hornetq.core.server.impl.InVMNodeManager.State.*;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
+ * Date: Oct 13, 2010
+ * Time: 3:55:47 PM
+ */
+public class InVMNodeManager extends NodeManager
+{
+
+ private Semaphore liveLock;
+
+ private Semaphore backupLock;
+
+ public enum State {LIVE, PAUSED, FAILING_BACK, NOT_STARTED}
+
+ public State state = NOT_STARTED;
+
+ public InVMNodeManager()
+ {
+ liveLock = new Semaphore(1);
+ backupLock = new Semaphore(1);
+ uuid = UUIDGenerator.getInstance().generateUUID();
+ nodeID = new SimpleString(uuid.toString());
+ }
+
+ @Override
+ public void awaitLiveNode() throws Exception
+ {
+ do
+ {
+ while (state == NOT_STARTED)
+ {
+ Thread.sleep(2000);
+ }
+
+ liveLock.acquire();
+
+ if (state == PAUSED)
+ {
+ liveLock.release();
+ Thread.sleep(2000);
+ }
+ else if (state == FAILING_BACK)
+ {
+ liveLock.release();
+ Thread.sleep(2000);
+ }
+ else if (state == LIVE)
+ {
+ releaseBackupNode();
+ break;
+ }
+ }
+ while (true);
+ }
+
+ @Override
+ public void startBackup() throws Exception
+ {
+ backupLock.acquire();
+ }
+
+ @Override
+ public void startLiveNode() throws Exception
+ {
+ state = FAILING_BACK;
+ liveLock.acquire();
+ state = LIVE;
+ }
+
+ @Override
+ public void pauseLiveServer() throws Exception
+ {
+ state = PAUSED;
+ liveLock.release();
+ }
+
+ @Override
+ public void crashLiveServer() throws Exception
+ {
+ //overkill as already set to live
+ state = LIVE;
+ liveLock.release();
+ }
+
+ @Override
+ public void stopBackup() throws Exception
+ {
+ backupLock.release();
+ }
+
+ private void releaseBackupNode()
+ {
+ if(backupLock != null)
+ {
+ backupLock.release();
+ }
+ }
+}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -633,7 +633,7 @@
bcConfigs1.add(bcConfig1);
liveConf.setBroadcastGroupConfigurations(bcConfigs1);
- liveService = createFakeLockServer(false, liveConf);
+ liveService = createServer(false, liveConf);
liveService.start();
}
}
Deleted: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/LockFileImplTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/LockFileImplTest.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/LockFileImplTest.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -1,140 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.cluster;
-
-import java.io.IOException;
-
-import org.hornetq.core.server.cluster.impl.LockFileImpl;
-import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.tests.util.UnitTestCase;
-
-/**
- * A LockFileImplTest
- *
- * @author jmesnil
- *
- *
- */
-public class LockFileImplTest extends UnitTestCase
-{
-
- // Constants -----------------------------------------------------
-
- /**
- * A ThreadExtension
- *
- * @author jmesnil
- *
- *
- */
- private final class Activation extends Thread
- {
- private LockFileImpl backupLock;
- private LockFileImpl liveLock;
-
- public void run() {
- backupLock = new LockFileImpl(RandomUtil.randomString(), System.getProperty("java.io.tmpdir"));
- try
- {
- backupLock.lock();
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
-
- liveLock = new LockFileImpl(liveLockFileName, System.getProperty("java.io.tmpdir"));
- try
- {
- liveLock.lock();
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- }
-
- public void close() throws IOException
- {
- if (liveLock != null)
- {
- liveLock.unlock();
- }
- if (backupLock != null)
- {
- backupLock.unlock();
- }
- }
- }
-
- public static final String liveLockFileName = "liveLock";
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public static void main(String[] args)
- {
- try
- {
- final LockFileImpl liveLock = new LockFileImpl(liveLockFileName, System.getProperty("java.io.tmpdir"));
- liveLock.lock();
- Thread.sleep(1000000);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-
- // 1. Run the class as a Java application to execute the main() in a separate VM
- // 2. Run this test
- public void testInterrupt() throws Exception
- {
- Activation t = new Activation();
- t.start();
-
- System.out.println("sleep");
- Thread.sleep(5000);
-
- t.close();
-
- long timeout = 10000;
- long start = System.currentTimeMillis();
- while (t.isAlive() && System.currentTimeMillis() - start < timeout)
- {
- System.out.println("before interrupt");
- t.interrupt();
- System.out.println("after interrupt");
-
- Thread.sleep(1000);
- }
-
- assertFalse(t.isAlive());
-
- t.join();
-
- }
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Added: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/NodeManagerAction.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/NodeManagerAction.java (rev 0)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/NodeManagerAction.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster;
+
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.impl.FileLockNodeManager;
+
+import java.nio.channels.FileLock;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
+ * Date: Oct 18, 2010
+ * Time: 10:09:12 AM
+ */
+public class NodeManagerAction
+{
+ public final static int START_LIVE = 0;
+ public final static int START_BACKUP = 1;
+ public final static int CRASH_LIVE = 2;
+ public final static int PAUSE_LIVE = 3;
+ public final static int STOP_BACKUP = 4;
+ public final static int AWAIT_LIVE = 5;
+
+ public final static int HAS_LIVE = 10;
+ public final static int HAS_BACKUP = 11;
+ public final static int DOESNT_HAVE_LIVE = 12;
+ public final static int DOESNT_HAVE_BACKUP = 13;
+
+ private final int[] work;
+
+ boolean hasLiveLock = false;
+ boolean hasBackupLock = false;
+
+ public NodeManagerAction(int... work)
+ {
+ this.work = work;
+ }
+
+ public void performWork(NodeManager nodeManager) throws Exception
+ {
+ for (int action : work)
+ {
+ switch (action)
+ {
+ case START_LIVE:
+ nodeManager.startLiveNode();
+ hasLiveLock = true;
+ hasBackupLock = false;
+ break;
+ case START_BACKUP:
+ nodeManager.startBackup();
+ hasBackupLock = true;
+ break;
+ case CRASH_LIVE:
+ nodeManager.crashLiveServer();
+ hasLiveLock = false;
+ break;
+ case PAUSE_LIVE:
+ nodeManager.pauseLiveServer();
+ hasLiveLock = false;
+ break;
+ case STOP_BACKUP:
+ nodeManager.stopBackup();
+ hasBackupLock = false;
+ break;
+ case AWAIT_LIVE:
+ nodeManager.awaitLiveNode();
+ hasLiveLock = true;
+ hasBackupLock = false;
+ break;
+ case HAS_LIVE:
+ if (!hasLiveLock)
+ {
+ throw new IllegalStateException("live lock not held");
+ }
+ break;
+ case HAS_BACKUP:
+
+ if (!hasBackupLock)
+ {
+ throw new IllegalStateException("backup lock not held");
+ }
+ break;
+ case DOESNT_HAVE_LIVE:
+ if (hasLiveLock)
+ {
+ throw new IllegalStateException("live lock held");
+ }
+ break;
+ case DOESNT_HAVE_BACKUP:
+
+ if (hasBackupLock)
+ {
+ throw new IllegalStateException("backup lock held");
+ }
+ break;
+ }
+ }
+ }
+
+ public String[] getWork()
+ {
+ String[] strings = new String[work.length];
+ for (int i = 0, stringsLength = strings.length; i < stringsLength; i++)
+ {
+ strings[i] = "" + work[i];
+ }
+ return strings;
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ int[] work1 = new int[args.length];
+ for (int i = 0; i < args.length; i++)
+ {
+ work1[i] = Integer.parseInt(args[i]);
+
+ }
+ NodeManagerAction nodeManagerAction = new NodeManagerAction(work1);
+ FileLockNodeManager nodeManager = new FileLockNodeManager(".");
+ nodeManager.start();
+ try
+ {
+ nodeManagerAction.performWork(nodeManager);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ System.exit(9);
+ }
+ System.out.println("work performed");
+ }
+
+}
Added: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/NodeManagerTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/NodeManagerTest.java (rev 0)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/NodeManagerTest.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster;
+
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.impl.InVMNodeManager;
+import org.hornetq.tests.util.ServiceTestBase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.hornetq.tests.integration.cluster.NodeManagerAction.*;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
+ * Date: Oct 16, 2010
+ * Time: 9:22:32 AM
+ */
+public class NodeManagerTest extends ServiceTestBase
+{
+ public void testLive() throws Exception
+ {
+ NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ performWork(live1);
+ }
+ public void testSimpleLiveAndBackup() throws Exception
+ {
+ NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, PAUSE_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ performWork(live1, backup1);
+ }
+
+ public void testSimpleBackupAndLive() throws Exception
+ {
+ NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, PAUSE_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ performWork(backup1, live1);
+ }
+
+ public void testSimpleLiveAnd2Backups() throws Exception
+ {
+ NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup2 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ performWork(live1, backup1, backup2);
+ }
+
+
+ public void testSimple2BackupsAndLive() throws Exception
+ {
+ NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup2 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ performWork(backup1, backup2, live1);
+ }
+
+ public void testSimpleLiveAnd2BackupsPaused() throws Exception
+ {
+ NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, PAUSE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup2 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, PAUSE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ performWork(live1, backup1, backup2);
+ }
+
+ public void testSimple2BackupsPausedAndLive() throws Exception
+ {
+ NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, PAUSE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup2 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, PAUSE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ performWork(backup1, backup2, live1);
+ }
+
+ public void testBackupsOnly() throws Exception
+ {
+ NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, STOP_BACKUP, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup2 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, STOP_BACKUP, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup3 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, STOP_BACKUP, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup4 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, STOP_BACKUP, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup5 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, STOP_BACKUP, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup6 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, STOP_BACKUP, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup7 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, STOP_BACKUP, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup8 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, STOP_BACKUP, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup9 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, STOP_BACKUP, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup10 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, STOP_BACKUP, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ NodeManagerAction backup11 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, STOP_BACKUP, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
+ performWork(backup1,backup2,backup3,backup4,backup5,backup6,backup7,backup8,backup9,backup10,backup11);
+ }
+
+ public void testLiveAndBackupLiveForcesFailback() throws Exception
+ {
+ NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE);
+ NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, AWAIT_LIVE, HAS_LIVE, PAUSE_LIVE);
+ performWork(live1, backup1);
+ }
+
+ public void testLiveAnd2BackupsLiveForcesFailback() throws Exception
+ {
+ NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE);
+ NodeManagerAction backup1 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, AWAIT_LIVE, HAS_LIVE, CRASH_LIVE);
+ NodeManagerAction backup2 = new NodeManagerAction(DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_BACKUP, HAS_BACKUP, AWAIT_LIVE, HAS_LIVE, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, AWAIT_LIVE, HAS_LIVE, CRASH_LIVE);
+ performWork(live1, backup1, backup2);
+ }
+
+ public void performWork(NodeManagerAction... actions) throws Exception
+ {
+ NodeManager nodeManager = new InVMNodeManager();
+ List<NodeRunner> nodeRunners = new ArrayList<NodeRunner>();
+ Thread[] threads = new Thread[actions.length];
+ for (NodeManagerAction action : actions)
+ {
+ NodeRunner nodeRunner = new NodeRunner(nodeManager, action);
+ nodeRunners.add(nodeRunner);
+ }
+ for (int i = 0, nodeRunnersSize = nodeRunners.size(); i < nodeRunnersSize; i++)
+ {
+ NodeRunner nodeRunner = nodeRunners.get(i);
+ threads[i] = new Thread(nodeRunner);
+ threads[i].start();
+ }
+
+ for (Thread thread : threads)
+ {
+ try
+ {
+ thread.join(5000);
+ }
+ catch (InterruptedException e)
+ {
+ //
+ }
+ if(thread.isAlive())
+ {
+ thread.interrupt();
+ fail("thread still running");
+ }
+ }
+
+ for (NodeRunner nodeRunner : nodeRunners)
+ {
+ if(nodeRunner.e != null)
+ {
+ nodeRunner.e.printStackTrace();
+ fail(nodeRunner.e.getMessage());
+ }
+ }
+ }
+
+ static class NodeRunner implements Runnable
+ {
+ private NodeManagerAction action;
+ private NodeManager manager;
+ Throwable e;
+ public NodeRunner(NodeManager nodeManager, NodeManagerAction action)
+ {
+ this.manager = nodeManager;
+ this.action = action;
+ }
+
+ public void run()
+ {
+ try
+ {
+ action.performWork(manager);
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ }
+ }
+
+
+}
Added: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/RealNodeManagerTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/RealNodeManagerTest.java (rev 0)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/RealNodeManagerTest.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster;
+
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.impl.FileLockNodeManager;
+import org.hornetq.tests.util.SpawnedVMSupport;
+import org.hornetq.utils.UUID;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
+ * Date: Oct 18, 2010
+ * Time: 10:34:25 AM
+ */
+public class RealNodeManagerTest extends NodeManagerTest
+{
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ File file = new File(".", "server.lock");
+ if(file.exists())
+ {
+ file.delete();
+ }
+ }
+
+ public void testId() throws Exception
+ {
+ NodeManager nodeManager = new FileLockNodeManager(".");
+ nodeManager.start();
+ UUID id1 = nodeManager.getUUID();
+ nodeManager.stop();
+ nodeManager.start();
+ assertEqualsByteArrays(id1.asBytes(), nodeManager.getUUID().asBytes());
+ nodeManager.stop();
+ }
+ @Override
+ public void performWork(NodeManagerAction... actions) throws Exception
+ {
+ List<Process> processes = new ArrayList<Process>();
+ for (NodeManagerAction action : actions)
+ {
+ Process p = SpawnedVMSupport.spawnVM(NodeManagerAction.class.getName(),"-Xms512m -Xmx512m ", new String[0], true, true,action.getWork());
+ processes.add(p);
+ }
+ for (Process process : processes)
+ {
+ process.waitFor();
+ }
+ for (Process process : processes)
+ {
+ if(process.exitValue() == 9)
+ {
+ fail("failed see output");
+ }
+ }
+
+ }
+}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -14,7 +14,6 @@
package org.hornetq.tests.integration.cluster.distribution;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -26,14 +25,10 @@
import junit.framework.Assert;
import org.hornetq.api.core.Message;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
-import org.hornetq.core.config.BroadcastGroupConfiguration;
-import org.hornetq.core.config.ClusterConnectionConfiguration;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.DiscoveryGroupConfiguration;
+import org.hornetq.core.config.*;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
@@ -44,12 +39,13 @@
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.cluster.RemoteQueueBinding;
-import org.hornetq.core.server.cluster.impl.FakeLockFile;
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
@@ -77,7 +73,7 @@
TransportConstants.DEFAULT_PORT + 8,
TransportConstants.DEFAULT_PORT + 9, };
- private static final long WAIT_TIMEOUT = 60000;
+ private static final long WAIT_TIMEOUT = 5000;
@Override
protected void setUp() throws Exception
@@ -94,6 +90,13 @@
sfs = new ClientSessionFactory[ClusterTestBase.MAX_SERVERS];
+ nodeManagers = new NodeManager[ClusterTestBase.MAX_SERVERS];
+
+ for (int i = 0, nodeManagersLength = nodeManagers.length; i < nodeManagersLength; i++)
+ {
+ nodeManagers[i] = new InVMNodeManager();
+ }
+
}
@Override
@@ -109,6 +112,8 @@
consumers = new ConsumerHolder[ClusterTestBase.MAX_CONSUMERS];
+ nodeManagers = null;
+
super.tearDown();
}
@@ -144,6 +149,8 @@
protected HornetQServer[] servers;
+ protected NodeManager[] nodeManagers;
+
protected ClientSessionFactory[] sfs;
protected ClientConsumer getConsumer(final int node)
@@ -1157,7 +1164,7 @@
}
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(serverTotc);
+ ServerLocator locator = HornetQClient.createServerLocatorWithHA(serverTotc);
locator.setRetryInterval(100);
locator.setRetryIntervalMultiplier(1d);
locator.setReconnectAttempts(-1);
@@ -1185,50 +1192,11 @@
protected void setupServer(final int node, final boolean fileStorage, final boolean netty)
{
- setupServer(node, fileStorage, netty, false, -1);
+ setupLiveServer(node, fileStorage, true, netty);
}
- protected void setupServer(final int node, final boolean fileStorage, final boolean netty, final boolean backup)
- {
- setupServer(node, fileStorage, netty, backup, -1);
- }
-
- protected void setupServer(final int node, final boolean fileStorage, final boolean netty, final boolean backup, final boolean useFakeLock)
- {
- setupServer(node, fileStorage, netty, backup, -1);
- }
-
- protected void setupServer(final int node, final boolean fileStorage, final boolean netty, final int backupNode)
- {
- setupServer(node, fileStorage, netty, false, backupNode, false);
- }
-
- protected void setupServer(final int node, final boolean fileStorage, final boolean netty, final int backupNode, final boolean useFakeLock)
- {
- setupServer(node, fileStorage, netty, false, backupNode, useFakeLock);
- }
-
- protected void setupServer(final int node,
+ /*protected void setupServer(final int node,
final boolean fileStorage,
- final boolean netty,
- final boolean backup,
- final int backupNode)
- {
- setupServer(node, fileStorage, netty, backup, backupNode, false);
- }
-
- protected void setupServer(final int node,
- final boolean fileStorage,
- final boolean netty,
- final boolean backup,
- final int backupNode,
- final boolean useFakeLock)
- {
- setupServer(node, fileStorage, true, netty, backup, backupNode, useFakeLock);
- }
-
- protected void setupServer(final int node,
- final boolean fileStorage,
final boolean sharedStorage,
final boolean netty,
final boolean backup,
@@ -1296,8 +1264,152 @@
}
}
servers[node] = server;
+ }*/
+
+ protected void setupLiveServer(final int node,
+ final boolean fileStorage,
+ final boolean sharedStorage,
+ final boolean netty)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " + node);
+ }
+
+ Configuration configuration = new ConfigurationImpl();
+
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setSharedStore(sharedStorage);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setJournalCompactMinFiles(0);
+
+ configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, generateParams(node, netty)));
+
+ HornetQServer server;
+
+ if (fileStorage)
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration);
+ }
+ }
+ else
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, false);
+ }
+ }
+ servers[node] = server;
+ }
+
+
+ protected void setupBackupServer(final int node,
+ final int liveNode,
+ final boolean fileStorage,
+ final boolean sharedStorage,
+ final boolean netty)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " + node);
+ }
+
+ Configuration configuration = new ConfigurationImpl();
+
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setSharedStore(sharedStorage);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
+ configuration.setJournalDirectory(getJournalDir(liveNode, false));
+ configuration.setPagingDirectory(getPageDir(liveNode, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setJournalCompactMinFiles(0);
+ configuration.setBackup(true);
+
+ configuration.getAcceptorConfigurations().clear();
+ TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true, generateParams(node, netty));
+ configuration.getAcceptorConfigurations().add(acceptorConfig);
+ //add backup connector
+ TransportConfiguration liveConfig = createTransportConfiguration(netty, false, generateParams(liveNode, netty));
+ configuration.getConnectorConfigurations().put(liveConfig.getName(), liveConfig);
+ TransportConfiguration backupConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
+ configuration.getConnectorConfigurations().put(backupConfig.getName(), backupConfig);
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(liveConfig.getName());
+ BackupConnectorConfiguration bcc = new BackupConnectorConfiguration(staticConnectors, backupConfig.getName());
+ configuration.setBackupConnectorConfiguration(bcc);
+
+ HornetQServer server;
+
+ if (fileStorage)
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration);
+ }
+ }
+ else
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, false);
+ }
+ }
+ servers[node] = server;
}
-
protected void setupServerWithDiscovery(final int node,
final String groupAddress,
final int port,
@@ -1618,7 +1730,6 @@
ClusterTestBase.log.info("stopping server " + node);
servers[node].stop();
ClusterTestBase.log.info("server stopped");
- FakeLockFile.clearLocks(servers[node].getConfiguration().getJournalDirectory());
}
catch (Exception e)
{
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -115,14 +115,14 @@
protected void setupServers() throws Exception
{
// The backups
- setupServer(0, isFileStorage(), isNetty(), true, 3, true);
- setupServer(1, isFileStorage(), isNetty(), true, 4, true);
- setupServer(2, isFileStorage(), isNetty(), true, 5, true);
+ setupBackupServer(0, 3, isFileStorage(), true, isNetty());
+ setupBackupServer(1, 4, isFileStorage(), true, isNetty());
+ setupBackupServer(2, 5, isFileStorage(), true, isNetty());
// The lives
- setupServer(3, isFileStorage(), isNetty(), 0, true);
- setupServer(4, isFileStorage(), isNetty(), 1, true);
- setupServer(5, isFileStorage(), isNetty(), 2, true);
+ setupLiveServer(3, isFileStorage(), true, isNetty());
+ setupLiveServer(4, isFileStorage(), true, isNetty());
+ setupLiveServer(5, isFileStorage(), true, isNetty());
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -42,7 +42,7 @@
for (int i = 0; i < 50; i++)
{
System.out.println("\n\n" + i + "\n\n");
- testStartStopServers();
+ _testStartStopServers();
tearDown();
setUp();
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -1330,7 +1330,7 @@
verifyReceiveRoundRobinInSomeOrder(10, 1, 2, 3, 4);
}
- public void testStartStopServers() throws Exception
+ public void _testStartStopServers() throws Exception
{
doTestStartStopServers(1, 3000);
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -23,7 +23,6 @@
package org.hornetq.tests.integration.cluster.distribution;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.cluster.impl.FakeLockFile;
import org.hornetq.tests.util.UnitTestCase;
/**
@@ -260,8 +259,8 @@
closeAllSessionFactories();
}
- @Override
- public void testStartStopServers() throws Exception
+ //@Override
+ public void _testStartStopServers() throws Exception
{
setupCluster();
@@ -563,18 +562,18 @@
protected void setupServers() throws Exception
{
// The backups
- setupServer(5, isFileStorage(), isNetty(), true, 0, true);
- setupServer(6, isFileStorage(), isNetty(), true, 1, true);
- setupServer(7, isFileStorage(), isNetty(), true, 2, true);
- setupServer(8, isFileStorage(), isNetty(), true, 3, true);
- setupServer(9, isFileStorage(), isNetty(), true, 4, true);
+ setupBackupServer(5, 0, isFileStorage(), true, isNetty());
+ setupBackupServer(6, 1, isFileStorage(), true, isNetty());
+ setupBackupServer(7, 2, isFileStorage(), true, isNetty());
+ setupBackupServer(8, 3, isFileStorage(), true, isNetty());
+ setupBackupServer(9, 4, isFileStorage(), true, isNetty());
// The lives
- setupServer(0, isFileStorage(), isNetty(), 5, true);
- setupServer(1, isFileStorage(), isNetty(), 6, true);
- setupServer(2, isFileStorage(), isNetty(), 7, true);
- setupServer(3, isFileStorage(), isNetty(), 8, true);
- setupServer(4, isFileStorage(), isNetty(), 9, true);
+ setupLiveServer(0, isFileStorage(), true, isNetty());
+ setupLiveServer(1, isFileStorage(), true, isNetty());
+ setupLiveServer(2, isFileStorage(), true, isNetty());
+ setupLiveServer(3, isFileStorage(), true, isNetty());
+ setupLiveServer(4, isFileStorage(), true, isNetty());
}
@Override
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -96,7 +96,7 @@
* This is like testStopStartServers but we make sure we pause longer than discovery group timeout
* before restarting (5 seconds)
*/
- public void testStartStopServersWithPauseBeforeRestarting() throws Exception
+ public void _testStartStopServersWithPauseBeforeRestarting() throws Exception
{
doTestStartStopServers(10000, 3000);
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -23,11 +23,14 @@
package org.hornetq.tests.integration.cluster.failover;
import java.util.Map;
+import java.util.Set;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.cluster.BroadcastGroup;
+import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
+import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
import org.hornetq.tests.util.ServiceTestBase;
@@ -340,10 +343,18 @@
group.stop();
}
}
-
+ Set<RemotingConnection> connections = server.getRemotingService().getConnections();
+ for (RemotingConnection remotingConnection : connections)
+ {
+ remotingConnection.destroy();
+ server.getRemotingService().removeConnection(remotingConnection.getID());
+ }
+
+ ClusterManagerImpl clusterManager = (ClusterManagerImpl) server.getClusterManager();
+ clusterManager.clear();
//FailoverManagerImpl.failAllConnectionsForConnector(serverTC);
- server.stop();
+ server.kill();
}
public void testFailAllNodes() throws Exception
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -14,8 +14,6 @@
package org.hornetq.tests.integration.cluster.failover;
import java.util.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
@@ -26,15 +24,11 @@
import org.hornetq.api.core.*;
import org.hornetq.api.core.client.*;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
-import org.hornetq.core.server.cluster.impl.FakeLockFile;
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.jms.client.HornetQTextMessage;
-import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.tests.util.UnitTestCase;
/**
*
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -14,7 +14,6 @@
package org.hornetq.tests.integration.cluster.failover;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.HashMap;
@@ -42,7 +41,8 @@
import org.hornetq.core.remoting.impl.invm.InVMConnector;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
-import org.hornetq.core.server.cluster.impl.FakeLockFile;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.ServiceTestBase;
@@ -69,6 +69,7 @@
protected Configuration backupConfig;
protected Configuration liveConfig;
+ private NodeManager nodeManager;
// Static --------------------------------------------------------
@@ -97,7 +98,6 @@
{
super.setUp();
clearData();
- FakeLockFile.clearLocks();
createConfigs();
liveServer.start();
@@ -110,12 +110,12 @@
protected TestableServer createLiveServer()
{
- return new SameProcessHornetQServer(createFakeLockServer(true, liveConfig));
+ return new SameProcessHornetQServer(createInVMFailoverServer(true, liveConfig, nodeManager));
}
protected TestableServer createBackupServer()
{
- return new SameProcessHornetQServer(createFakeLockServer(true, backupConfig));
+ return new SameProcessHornetQServer(createInVMFailoverServer(true, backupConfig, nodeManager));
}
/**
@@ -123,6 +123,8 @@
*/
protected void createConfigs() throws Exception
{
+ nodeManager = new InVMNodeManager();
+
backupConfig = super.createDefaultConfig();
backupConfig.getAcceptorConfigurations().clear();
backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
@@ -217,6 +219,8 @@
liveServer = null;
+ nodeManager = null;
+
InVMConnector.failOnCreateConnection = false;
super.tearDown();
@@ -408,4 +412,6 @@
//To change body of implemented methods use File | Settings | File Templates.
}
}
+
+
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -87,6 +87,6 @@
@Override
void setupMasterServer(final int i, final boolean fileStorage, final boolean netty)
{
- setupServer(i, fileStorage, false, netty, false, 2, false);
+ setupLiveServer(i, fileStorage, false, netty);
}
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -34,7 +34,6 @@
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.server.cluster.impl.FakeLockFile;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.ServiceTestBase;
@@ -63,7 +62,6 @@
{
super.setUp();
clearData();
- FakeLockFile.clearLocks();
}
// Package protected ---------------------------------------------
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -26,6 +26,8 @@
import org.hornetq.core.config.BackupConnectorConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.TestableServer;
@@ -57,12 +59,14 @@
public void testMultipleFailovers2LiveServers() throws Exception
{
- createLiveConfig(0, 3, 4, 5);
- createBackupConfig(0, 1, true, new int[] {0, 2}, 3, 4, 5);
- createBackupConfig(0, 2, true, new int[] {0, 1}, 3, 4, 5);
- createLiveConfig(3, 0);
- createBackupConfig(3, 4, true, new int[] {3, 5}, 0, 1, 2);
- createBackupConfig(3, 5, true, new int[] {3, 4}, 0, 1, 2);
+ NodeManager nodeManager1 = new InVMNodeManager();
+ NodeManager nodeManager2 = new InVMNodeManager();
+ createLiveConfig(nodeManager1, 0, 3, 4, 5);
+ createBackupConfig(nodeManager1, 0, 1, true, new int[] {0, 2}, 3, 4, 5);
+ createBackupConfig(nodeManager1, 0, 2, true, new int[] {0, 1}, 3, 4, 5);
+ createLiveConfig(nodeManager2, 3, 0);
+ createBackupConfig(nodeManager2, 3, 4, true, new int[] {3, 5}, 0, 1, 2);
+ createBackupConfig(nodeManager2, 3, 5, true, new int[] {3, 4}, 0, 1, 2);
servers.get(0).start();
servers.get(3).start();
servers.get(1).start();
@@ -118,7 +122,7 @@
}
}
- protected void createBackupConfig(int liveNode, int nodeid, boolean createClusterConnections, int[] otherBackupNodes, int... otherClusterNodes)
+ protected void createBackupConfig(NodeManager nodeManager, int liveNode, int nodeid, boolean createClusterConnections, int[] otherBackupNodes, int... otherClusterNodes)
{
Configuration config1 = super.createDefaultConfig();
config1.getAcceptorConfigurations().clear();
@@ -155,10 +159,10 @@
config1.setPagingDirectory(config1.getPagingDirectory() + "_" + liveNode);
config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() + "_" + liveNode);
- servers.put(nodeid, new SameProcessHornetQServer(createFakeLockServer(true, config1)));
+ servers.put(nodeid, new SameProcessHornetQServer(createInVMFailoverServer(true, config1, nodeManager)));
}
- protected void createLiveConfig(int liveNode, int ... otherLiveNodes)
+ protected void createLiveConfig(NodeManager nodeManager, int liveNode, int ... otherLiveNodes)
{
TransportConfiguration liveConnector = createTransportConfiguration(isNetty(), false, generateParams(liveNode, isNetty()));
Configuration config0 = super.createDefaultConfig();
@@ -185,7 +189,7 @@
config0.setPagingDirectory(config0.getPagingDirectory() + "_" + liveNode);
config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() + "_" + liveNode);
- servers.put(liveNode, new SameProcessHornetQServer(createFakeLockServer(true, config0)));
+ servers.put(liveNode, new SameProcessHornetQServer(createInVMFailoverServer(true, config0, nodeManager)));
}
protected boolean isNetty()
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -73,14 +73,14 @@
return true;
}
- @Override
+
protected void createLiveConfig(int liveNode, int... otherLiveNodes)
{
servers.put(liveNode, new RemoteProcessHornetQServer(lives.get(liveNode)));
}
- @Override
+
protected void createBackupConfig(int liveNode,
int nodeid,
boolean createClusterConnections,
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -267,9 +267,9 @@
{
super.setUp();
- setupServer(1, true, isShared(), true, false, -1, false);
- setupServer(2, true, isShared(), true, true, -1, false);
- setupServer(3, true, isShared(), true, true, 2, false);
+ setupLiveServer(1, true, isShared(), true);
+ setupBackupServer(2, 1, true, isShared(), true);
+ setupBackupServer(3, 1, true, isShared(), true);
setupClusterConnectionWithBackups("test", "test", false, 1, true, 1, new int[] { 3 }, new int[] { 2 });
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -29,6 +29,8 @@
import org.hornetq.core.config.BackupConnectorConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.TestableServer;
@@ -38,14 +40,16 @@
{
protected Map<Integer, TestableServer> servers = new HashMap<Integer, TestableServer>();
+ private NodeManager nodeManager;
public void testMultipleFailovers() throws Exception
{
+ nodeManager = new InVMNodeManager();
createLiveConfig(0);
createBackupConfig(0, 1,false, 0, 2, 3, 4, 5);
createBackupConfig(0, 2,false, 0, 1, 3, 4, 5);
createBackupConfig(0, 3,false, 0, 1, 2, 4, 5);
- createBackupConfig(0, 4, false, 0, 1, 2, 3, 4);
+ createBackupConfig(0, 4, false, 0, 1, 2, 3, 5);
createBackupConfig(0, 5, false, 0, 1, 2, 3, 4);
servers.get(0).start();
servers.get(1).start();
@@ -128,7 +132,7 @@
config1.setPagingDirectory(config1.getPagingDirectory() + "_" + liveNode);
config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() + "_" + liveNode);
- servers.put(nodeid, new SameProcessHornetQServer(createFakeLockServer(true, config1)));
+ servers.put(nodeid, new SameProcessHornetQServer(createInVMFailoverServer(true, config1, nodeManager)));
}
protected void createLiveConfig(int liveNode, int ... otherLiveNodes)
@@ -140,7 +144,7 @@
config0.setSecurityEnabled(false);
config0.setSharedStore(true);
config0.setClustered(true);
- List<String> pairs = new ArrayList<String>();
+ List<String> pairs = null;
for (int node : otherLiveNodes)
{
TransportConfiguration otherLiveConnector = createTransportConfiguration(isNetty(), false, generateParams(node, isNetty()));
@@ -158,7 +162,7 @@
config0.setPagingDirectory(config0.getPagingDirectory() + "_" + liveNode);
config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() + "_" + liveNode);
- servers.put(liveNode, new SameProcessHornetQServer(createFakeLockServer(true, config0)));
+ servers.put(liveNode, new SameProcessHornetQServer(createInVMFailoverServer(true, config0, nodeManager)));
}
protected boolean isNetty()
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -94,14 +94,14 @@
protected void setupServers() throws Exception
{
// The backups
- setupServer(3, isFileStorage(), isNetty(), true);
- setupServer(4, isFileStorage(), isNetty(), true);
- setupServer(5, isFileStorage(), isNetty(), true);
+ setupBackupServer(3, 0, isFileStorage(), true, isNetty());
+ setupBackupServer(4, 1, isFileStorage(), true, isNetty());
+ setupBackupServer(5, 2, isFileStorage(), true, isNetty());
// The lives
- setupServer(0, isFileStorage(), isNetty(), 3);
- setupServer(1, isFileStorage(), isNetty(), 4);
- setupServer(2, isFileStorage(), isNetty(), 5);
+ setupLiveServer(0, isFileStorage(), true, isNetty());
+ setupLiveServer(1, isFileStorage(), true, isNetty());
+ setupLiveServer(2, isFileStorage(), true, isNetty());
}
// Package protected ---------------------------------------------
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -13,7 +13,6 @@
package org.hornetq.tests.integration.cluster.util;
-import java.io.File;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -93,12 +92,7 @@
ClusterManagerImpl clusterManager = (ClusterManagerImpl) server.getClusterManager();
clusterManager.clear();
- server.stop();
- // recreate the live.lock file (since it was deleted by the
- // clean stop
- File lockFile = new File(server.getConfiguration().getJournalDirectory(), "live.lock");
- Assert.assertFalse(lockFile.exists());
- lockFile.createNewFile();
+ server.kill();
// Wait to be informed of failure
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -28,11 +28,10 @@
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.naming.NamingException;
+import javax.management.MBeanServer;
import junit.framework.Assert;
-import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
@@ -46,17 +45,18 @@
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQServers;
-import org.hornetq.core.server.cluster.impl.FakeLockFile;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQSession;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.tests.integration.jms.server.management.JMSUtil;
import org.hornetq.tests.unit.util.InVMContext;
-import org.hornetq.tests.util.FakeLockHornetQServer;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
@@ -311,7 +311,6 @@
protected void setUp() throws Exception
{
super.setUp();
- FakeLockFile.clearLocks();
startServers();
}
@@ -320,6 +319,7 @@
*/
protected void startServers() throws Exception
{
+ NodeManager nodeManager = new InVMNodeManager();
backuptc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
backupParams);
livetc = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory");
@@ -353,7 +353,7 @@
backupConf.setLargeMessagesDirectory(getLargeMessagesDir());
backupConf.setPersistenceEnabled(true);
backupConf.setClustered(true);
- backupService = new FakeLockHornetQServer(backupConf);
+ backupService = new InVMNodeManagerServer(backupConf, nodeManager);
backupJMSService = new JMSServerManagerImpl(backupService);
@@ -381,7 +381,7 @@
liveConf.getConnectorConfigurations().put(livetc.getName(), livetc);
liveConf.setPersistenceEnabled(true);
liveConf.setClustered(true);
- liveService = new FakeLockHornetQServer(liveConf);
+ liveService = new InVMNodeManagerServer(liveConf, nodeManager);
liveJMSService = new JMSServerManagerImpl(liveService);
@@ -432,4 +432,47 @@
}
}
+
+ // Inner classes -------------------------------------------------
+ class InVMNodeManagerServer extends HornetQServerImpl
+ {
+ final NodeManager nodeManager;
+ public InVMNodeManagerServer(NodeManager nodeManager)
+ {
+ super();
+ this.nodeManager = nodeManager;
+ }
+
+ public InVMNodeManagerServer(Configuration configuration, NodeManager nodeManager)
+ {
+ super(configuration);
+ this.nodeManager = nodeManager;
+ }
+
+ public InVMNodeManagerServer(Configuration configuration, MBeanServer mbeanServer, NodeManager nodeManager)
+ {
+ super(configuration, mbeanServer);
+ this.nodeManager = nodeManager;
+ }
+
+ public InVMNodeManagerServer(Configuration configuration, HornetQSecurityManager securityManager, NodeManager nodeManager)
+ {
+ super(configuration, securityManager);
+ this.nodeManager = nodeManager;
+ }
+
+ public InVMNodeManagerServer(Configuration configuration, MBeanServer mbeanServer, HornetQSecurityManager securityManager, NodeManager nodeManager)
+ {
+ super(configuration, mbeanServer, securityManager);
+ this.nodeManager = nodeManager;
+ }
+
+ @Override
+ protected NodeManager createNodeManager(String directory)
+ {
+ return nodeManager;
+ }
+
+ }
+
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/TopicClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/TopicClusterTest.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/cluster/TopicClusterTest.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -83,7 +83,7 @@
// topic1 and 2 should be the same.
// Using a different instance here just to make sure it is implemented correctly
MessageConsumer cons2 = session2.createDurableSubscriber(topic2, "sub2");
-
+ Thread.sleep(2000);
MessageProducer prod1 = session1.createProducer(topic1);
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -228,7 +228,7 @@
ClusterManagerImpl clusterManager = (ClusterManagerImpl) server.getClusterManager();
clusterManager.clear();
- server.stop();
+ server.kill();
// recreate the live.lock file (since it was deleted by the
// clean stop
File lockFile = new File(server.getConfiguration().getJournalDirectory(), "live.lock");
Deleted: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/FakeLockHornetQServer.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -1,60 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.util;
-
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.server.cluster.LockFile;
-import org.hornetq.core.server.cluster.impl.FakeLockFile;
-import org.hornetq.core.server.impl.HornetQServerImpl;
-import org.hornetq.spi.core.security.HornetQSecurityManager;
-
-import javax.management.MBeanServer;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * Created Jul 23, 2010
- */
-public class FakeLockHornetQServer extends HornetQServerImpl
-{
- public FakeLockHornetQServer()
- {
- super(); //To change body of overridden methods use File | Settings | File Templates.
- }
-
- public FakeLockHornetQServer(Configuration configuration)
- {
- super(configuration); //To change body of overridden methods use File | Settings | File Templates.
- }
-
- public FakeLockHornetQServer(Configuration configuration, MBeanServer mbeanServer)
- {
- super(configuration, mbeanServer); //To change body of overridden methods use File | Settings | File Templates.
- }
-
- public FakeLockHornetQServer(Configuration configuration, HornetQSecurityManager securityManager)
- {
- super(configuration, securityManager); //To change body of overridden methods use File | Settings | File Templates.
- }
-
- public FakeLockHornetQServer(Configuration configuration, MBeanServer mbeanServer, HornetQSecurityManager securityManager)
- {
- super(configuration, mbeanServer, securityManager); //To change body of overridden methods use File | Settings | File Templates.
- }
-
- @Override
- protected LockFile createLockFile(String fileName, String directory)
- {
- return new FakeLockFile(fileName, directory);
- }
-}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -133,7 +133,7 @@
List<String> toOtherServerPair = new ArrayList<String>();
toOtherServerPair.add("toServer1");
- Configuration conf2 = createDefaultConfig(1, generateInVMParams(2), InVMAcceptorFactory.class.getCanonicalName());
+ Configuration conf2 = createDefaultConfig(2, generateInVMParams(2), InVMAcceptorFactory.class.getCanonicalName());
conf2.setSecurityEnabled(false);
conf2.setJMXManagementEnabled(true);
conf2.setPersistenceEnabled(false);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -38,6 +38,8 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.jms.client.HornetQBytesMessage;
import org.hornetq.jms.client.HornetQTextMessage;
@@ -206,47 +208,39 @@
return createServer(realFiles, configuration, -1, -1, new HashMap<String, AddressSettings>());
}
- protected HornetQServer createFakeLockServer(final boolean realFiles)
+ protected HornetQServer createInVMFailoverServer(final boolean realFiles, final Configuration configuration, NodeManager nodeManager)
{
- return createFakeLockServer(realFiles, false);
+ return createInVMFailoverServer(realFiles, configuration, -1, -1, new HashMap<String, AddressSettings>(), nodeManager);
}
- protected HornetQServer createFakeLockServer(final boolean realFiles, final boolean netty)
- {
- return createFakeLockServer(realFiles, createDefaultConfig(netty), -1, -1, new HashMap<String, AddressSettings>());
- }
+ protected HornetQServer createInVMFailoverServer(final boolean realFiles,
+ final Configuration configuration,
+ final int pageSize,
+ final int maxAddressSize,
+ final Map<String, AddressSettings> settings,
+ NodeManager nodeManager)
+ {
+ HornetQServer server;
+ HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
+ configuration.setPersistenceEnabled(realFiles);
+ server = new InVMNodeManagerServer(configuration,ManagementFactory.getPlatformMBeanServer(),securityManager, nodeManager);
- protected HornetQServer createFakeLockServer(final boolean realFiles, final Configuration configuration)
- {
- return createFakeLockServer(realFiles, configuration, -1, -1, new HashMap<String, AddressSettings>());
- }
- protected HornetQServer createFakeLockServer(final boolean realFiles,
- final Configuration configuration,
- final int pageSize,
- final int maxAddressSize,
- final Map<String, AddressSettings> settings)
- {
- HornetQServer server;
- HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
- configuration.setPersistenceEnabled(realFiles);
- server = new FakeLockHornetQServer(configuration,ManagementFactory.getPlatformMBeanServer(),securityManager);
+ for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
+ {
+ server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+ }
+ AddressSettings defaultSetting = new AddressSettings();
+ defaultSetting.setPageSizeBytes(pageSize);
+ defaultSetting.setMaxSizeBytes(maxAddressSize);
- for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
- {
- server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ return server;
}
- AddressSettings defaultSetting = new AddressSettings();
- defaultSetting.setPageSizeBytes(pageSize);
- defaultSetting.setMaxSizeBytes(maxAddressSize);
- server.getAddressSettingsRepository().addMatch("#", defaultSetting);
-
- return server;
- }
-
protected HornetQServer createServer(final boolean realFiles,
final Configuration configuration,
final HornetQSecurityManager securityManager)
@@ -509,5 +503,44 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+ class InVMNodeManagerServer extends HornetQServerImpl
+ {
+ final NodeManager nodeManager;
+ public InVMNodeManagerServer(NodeManager nodeManager)
+ {
+ super();
+ this.nodeManager = nodeManager;
+ }
+ public InVMNodeManagerServer(Configuration configuration, NodeManager nodeManager)
+ {
+ super(configuration);
+ this.nodeManager = nodeManager;
+ }
+
+ public InVMNodeManagerServer(Configuration configuration, MBeanServer mbeanServer, NodeManager nodeManager)
+ {
+ super(configuration, mbeanServer);
+ this.nodeManager = nodeManager;
+ }
+
+ public InVMNodeManagerServer(Configuration configuration, HornetQSecurityManager securityManager, NodeManager nodeManager)
+ {
+ super(configuration, securityManager);
+ this.nodeManager = nodeManager;
+ }
+
+ public InVMNodeManagerServer(Configuration configuration, MBeanServer mbeanServer, HornetQSecurityManager securityManager, NodeManager nodeManager)
+ {
+ super(configuration, mbeanServer, securityManager);
+ this.nodeManager = nodeManager;
+ }
+
+ @Override
+ protected NodeManager createNodeManager(String directory)
+ {
+ return nodeManager;
+ }
+
+ }
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-10-20 03:05:39 UTC (rev 9797)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-10-20 08:17:20 UTC (rev 9798)
@@ -65,7 +65,6 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.cluster.impl.FakeLockFile;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.jms.client.HornetQTextMessage;
13 years, 6 months
JBoss hornetq SVN: r9797 - in branches/hornetq-416/src/main/org/hornetq: jms/client and 1 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-10-19 23:05:39 -0400 (Tue, 19 Oct 2010)
New Revision: 9797
Modified:
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java
branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java
branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
Log:
add meta data to every session, not depending on initial session
Modified: branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-10-19 23:58:09 UTC (rev 9796)
+++ branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-10-20 03:05:39 UTC (rev 9797)
@@ -251,6 +251,4 @@
*/
@Operation(desc = "List all JMS consumers associated to a JMS Connection")
String listConsumersAsJSON(@Parameter(desc = "a connection ID", name = "connectionID") String connectionID) throws Exception;
-
-
}
Modified: branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-10-19 23:58:09 UTC (rev 9796)
+++ branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-10-20 03:05:39 UTC (rev 9797)
@@ -183,11 +183,11 @@
this.clientID = clientID;
try
{
- initialSession.addMetaData("jms-client-id", clientID);
+ this.addSessionMetaData(initialSession);
}
catch (HornetQException e)
{
- JMSException ex = new JMSException("Internal erro setting metadata jms-client-id");
+ JMSException ex = new JMSException("Internal error setting metadata jms-client-id");
ex.setLinkedException(e);
throw ex;
}
@@ -547,6 +547,8 @@
{
session.start();
}
+
+ this.addSessionMetaData(session);
return jbs;
}
@@ -571,13 +573,9 @@
try
{
initialSession = sessionFactory.createSession(username, password, false, false, false, false, 0);
- //mark it is a jms initial session
- initialSession.addMetaData("jms-initial-session", "");
- if (clientID != null)
- {
- initialSession.addMetaData("jms-client-id", clientID);
- }
+ addSessionMetaData(initialSession);
+
initialSession.addFailureListener(listener);
}
catch (HornetQException me)
@@ -586,6 +584,14 @@
}
}
+ private void addSessionMetaData(ClientSession session) throws HornetQException
+ {
+ if (clientID != null)
+ {
+ session.addMetaData("jms-client-id", clientID);
+ }
+ }
+
// Inner classes --------------------------------------------------------------------------------
private static class JMSFailureListener implements SessionFailureListener
Modified: branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-10-19 23:58:09 UTC (rev 9796)
+++ branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-10-20 03:05:39 UTC (rev 9797)
@@ -740,13 +740,13 @@
Set<ServerSession> sessions = server.getHornetQServer().getSessions();
- Map<Object, ServerSession> initialSessions = new HashMap<Object, ServerSession>();
+ Map<Object, ServerSession> jmsSessions = new HashMap<Object, ServerSession>();
for (ServerSession session : sessions)
{
- if (session.getMetaData("jms-initial-session") != null)
+ if (session.getMetaData("jms-client-id") != null)
{
- initialSessions.put(session.getConnectionID(), session);
+ jmsSessions.put(session.getConnectionID(), session);
}
}
@@ -756,8 +756,8 @@
obj.put("connectionID", connection.getID().toString());
obj.put("clientAddress", connection.getRemoteAddress());
obj.put("creationTime", connection.getCreationTime());
- obj.put("clientID", initialSessions.get(connection.getID()).getMetaData("jms-client-id"));
- obj.put("principal", initialSessions.get(connection.getID()).getUsername());
+ obj.put("clientID", jmsSessions.get(connection.getID()).getMetaData("jms-client-id"));
+ obj.put("principal", jmsSessions.get(connection.getID()).getUsername());
array.put(obj);
}
return array.toString();
13 years, 6 months
JBoss hornetq SVN: r9796 - branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-19 19:58:09 -0400 (Tue, 19 Oct 2010)
New Revision: 9796
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
Log:
Improving delete process
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-19 21:28:15 UTC (rev 9795)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-19 23:58:09 UTC (rev 9796)
@@ -416,7 +416,12 @@
public void printDebug()
{
- System.out.println("Debug information on PageCurorImpl- " + this);
+ printDebug(this.toString());
+ }
+
+ public void printDebug(String msg)
+ {
+ System.out.println("Debug information on PageCurorImpl- " + msg);
for (PageCursorInfo info : consumedPages.values())
{
System.out.println(info);
@@ -540,7 +545,8 @@
{
for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
{
- if (entry.getValue().isDone())
+ PageCursorInfo info = entry.getValue();
+ if (info.isDone() && !info.isPendingDelete())
{
if (entry.getKey() == lastAckedPosition.getPageNr())
{
@@ -548,6 +554,7 @@
}
else
{
+ info.setPendingDelete();
completedPages.add(entry.getValue());
}
}
@@ -592,11 +599,14 @@
{
PageCursorImpl.trace("Removing page " + completePage.getPageId());
}
- trace("Removing page " + completePage.getPageId());
- consumedPages.remove(completePage.getPageId());
- }
+ if (consumedPages.remove(completePage.getPageId()) == null)
+ {
+ log.warn("Couldn't remove page " + completePage.getPageId() + " from consumed pages on cursor for address " + pageStore.getAddress());
+ }
+ }
}
+
cursorProvider.scheduleCleanup();
}
});
@@ -624,13 +634,18 @@
// The page was live at the time of the creation
private final boolean wasLive;
+ // There's a pending delete on the async IO pipe
+ // We're holding this object to avoid delete the pages before the IO is complete,
+ // however we can't delete these records again
+ private boolean pendingDelete;
+
// We need a separate counter as the cursor may be ignoring certain values because of incomplete transactions or
// expressions
private final AtomicInteger confirmed = new AtomicInteger(0);
public String toString()
{
- return "PageCursorInfo::PaeID=" + pageId + " numberOfMessage = " + numberOfMessages;
+ return "PageCursorInfo::PageID=" + pageId + " numberOfMessage = " + numberOfMessages;
}
public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache)
@@ -648,6 +663,16 @@
{
return getNumberOfMessages() == confirmed.get();
}
+
+ public boolean isPendingDelete()
+ {
+ return pendingDelete;
+ }
+
+ public void setPendingDelete()
+ {
+ this.pendingDelete = true;
+ }
/**
* @return the pageId
13 years, 6 months
JBoss hornetq SVN: r9795 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-19 17:28:15 -0400 (Tue, 19 Oct 2010)
New Revision: 9795
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Implementing cleanup after closing a cursor
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-16 04:00:15 UTC (rev 9794)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-19 21:28:15 UTC (rev 9795)
@@ -29,8 +29,15 @@
// Cursor query operations --------------------------------------
+ // To be called before the server is down
void stop();
+ /** It will be 0 if non persistent cursor */
+ public long getId();
+
+ // To be called when the cursor is closed for good. Most likely when the queue is deleted
+ void close() throws Exception;
+
Pair<PagePosition, PagedMessage> moveNext() throws Exception;
void ack(PagePosition position) throws Exception;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-16 04:00:15 UTC (rev 9794)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-19 21:28:15 UTC (rev 9795)
@@ -66,6 +66,15 @@
void scheduleCleanup();
+ /**
+ * @param pageCursorImpl
+ */
+ void close(PageCursor pageCursorImpl);
+
+ // to be used on tests -------------------------------------------
+
+ int getCacheSize();
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-16 04:00:15 UTC (rev 9794)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-19 21:28:15 UTC (rev 9795)
@@ -24,6 +24,7 @@
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.Pair;
@@ -107,6 +108,11 @@
// Public --------------------------------------------------------
+ public PageCursorProvider getProvider()
+ {
+ return this.cursorProvider;
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
*/
@@ -196,10 +202,9 @@
*/
public long getFirstPage()
{
- Long firstKey = consumedPages.firstKey();
- if (firstKey == null)
+ if (consumedPages.isEmpty())
{
- return Long.MAX_VALUE;
+ return 0;
}
else
{
@@ -221,7 +226,6 @@
*/
public void reloadACK(final PagePosition position)
{
- System.out.println("reloading " + position);
if (recoveredACK == null)
{
recoveredACK = new LinkedList<PagePosition>();
@@ -246,8 +250,95 @@
{
processACK(position);
}
+
+
+ /**
+ * All the data associated with the cursor should go away here
+ */
+ public void close() throws Exception
+ {
+ final long tx = store.generateUniqueID();
+
+ final ArrayList<Exception> ex = new ArrayList<Exception>();
+
+ final AtomicBoolean isPersistent = new AtomicBoolean(false);
+
+ // We can't delete the records at the caller's thread
+ // because an executor may be holding the synchronized on PageCursorImpl
+ // what would lead to a dead lock
+ // so, we delete it inside the executor also
+ // and wait for the result
+ // The caller will be treating eventual IO exceptions and dispatching to the original thread's caller
+ executor.execute(new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ synchronized (PageCursorImpl.this)
+ {
+ for (PageCursorInfo cursor : consumedPages.values())
+ {
+ for (PagePosition info : cursor.acks)
+ {
+ if (info.getRecordID() != 0)
+ {
+ isPersistent.set(true);
+ store.deleteCursorAcknowledgeTransactional(tx, info.getRecordID());
+ }
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ ex.add(e);
+ log.warn(e.getMessage(), e);
+ }
+ }
+ });
+
+ Future future = new Future();
+
+ executor.execute(future);
+
+ while (!future.await(5000))
+ {
+ log.warn("Timeout on waiting cursor " + this + " to be closed");
+ }
+
+
+ if (isPersistent.get())
+ {
+ // Another reason to perform the commit at the main thread is because the OperationContext may only send the result to the client when
+ // the IO on commit is done
+ if (ex.size() == 0)
+ {
+ store.commit(tx);
+ }
+ else
+ {
+ store.rollback(tx);
+ throw ex.get(0);
+ }
+ }
+
+ cursorProvider.close(this);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#getId()
+ */
+ public long getId()
+ {
+ return cursorId;
+ }
+
+
+
public void processReload() throws Exception
{
if (recoveredACK != null)
@@ -317,7 +408,10 @@
{
Future future = new Future();
executor.execute(future);
- future.await(1000);
+ while (!future.await(1000))
+ {
+ log.warn("Waiting page cursor to finish executors - " + this);
+ }
}
public void printDebug()
@@ -340,7 +434,6 @@
if (pageInfo == null)
{
PageCache cache = cursorProvider.getPageCache(pos);
- System.out.println("Number of Messages = " + cache.getNumberOfMessages());
pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages(), cache);
consumedPages.put(pos.getPageNr(), pageInfo);
}
@@ -451,7 +544,7 @@
{
if (entry.getKey() == lastAckedPosition.getPageNr())
{
- System.out.println("We can't clear page " + entry.getKey() + " now since it's the current page");
+ trace("We can't clear page " + entry.getKey() + " now since it's the current page");
}
else
{
@@ -499,7 +592,7 @@
{
PageCursorImpl.trace("Removing page " + completePage.getPageId());
}
- System.out.println("Removing page " + completePage.getPageId());
+ trace("Removing page " + completePage.getPageId());
consumedPages.remove(completePage.getPageId());
}
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-16 04:00:15 UTC (rev 9794)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-19 21:28:15 UTC (rev 9795)
@@ -33,6 +33,7 @@
import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.ConcurrentSet;
import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.Future;
import org.hornetq.utils.SoftValueHashMap;
import org.jboss.netty.util.internal.ConcurrentHashMap;
@@ -67,7 +68,7 @@
private SoftValueHashMap<Long, PageCache> softCache = new SoftValueHashMap<Long, PageCache>();
private ConcurrentMap<Long, PageCursor> activeCursors = new ConcurrentHashMap<Long, PageCursor>();
-
+
private ConcurrentSet<PageCursor> nonPersistentCursors = new ConcurrentHashSet<PageCursor>();
// Static --------------------------------------------------------
@@ -83,7 +84,7 @@
this.storageManager = storageManager;
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
- }
+ }
// Public --------------------------------------------------------
@@ -251,14 +252,42 @@
cursor.stop();
}
- activeCursors.clear();
+ for (PageCursor cursor : nonPersistentCursors)
+ {
+ cursor.stop();
+ }
+
+ Future future = new Future();
+
+ executor.execute(future);
+
+ while (!future.await(10000))
+ {
+ log.warn("Waiting cursor provider " + this + " to finish executors");
+ }
+
}
+ public void close(PageCursor cursor)
+ {
+ if (cursor.getId() != 0)
+ {
+ activeCursors.remove(cursor.getId());
+ }
+ else
+ {
+ nonPersistentCursors.remove(cursor);
+ }
+
+ scheduleCleanup();
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#scheduleCleanup()
*/
public void scheduleCleanup()
{
+
executor.execute(new Runnable()
{
public void run()
@@ -274,17 +303,18 @@
try
{
- System.out.println("MinPage = " + minPage + " firstPage = " + pagingStore.getFirstPage());
for (long i = pagingStore.getFirstPage(); i < minPage; i++)
{
Page page = pagingStore.depage();
- System.out.println("Deleting files associated with page " + page);
- page.delete();
+ if (page != null)
+ {
+ page.delete();
+ }
}
}
- catch (Exception e)
+ catch (Exception ex)
{
- log.warn("Couldn't complete cleanup on paging", e);
+ log.warn("Couldn't complete cleanup on paging", ex);
}
}
@@ -316,15 +346,13 @@
cursorList.addAll(activeCursors.values());
cursorList.addAll(nonPersistentCursors);
}
-
+
if (cursorList.size() == 0)
{
return 0l;
}
long minPage = Long.MAX_VALUE;
-
- System.out.println("CursorList : " + cursorList.size());
for (PageCursor cursor : cursorList)
{
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-16 04:00:15 UTC (rev 9794)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-19 21:28:15 UTC (rev 9795)
@@ -56,6 +56,7 @@
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.Future;
/**
*
@@ -210,6 +211,11 @@
}
// Public --------------------------------------------------------
+
+ public String toString()
+ {
+ return "PagingStoreImpl(" + this.address + ")";
+ }
// PagingStore implementation ------------------------------------
@@ -393,30 +399,26 @@
{
if (running)
{
+
+ cursorProvider.stop();
+
running = false;
- final CountDownLatch latch = new CountDownLatch(1);
+ Future future = new Future();
- executor.execute(new Runnable()
- {
- public void run()
- {
- latch.countDown();
- }
- });
+ executor.execute(future);
- if (!latch.await(60, TimeUnit.SECONDS))
+ if (!future.await(60000))
{
PagingStoreImpl.log.warn("Timed out on waiting PagingStore " + address + " to shutdown");
}
+
if (currentPage != null)
{
currentPage.close();
currentPage = null;
}
-
- cursorProvider.stop();
}
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-16 04:00:15 UTC (rev 9794)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-19 21:28:15 UTC (rev 9795)
@@ -470,6 +470,51 @@
// Validate the pages are being cleared (with multiple cursors)
}
+
+ public void testCloseNonPersistentConsumer() throws Exception
+ {
+
+ final int NUM_MESSAGES = 100;
+
+ int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+ System.out.println("NumberOfPages = " + numberOfPages);
+
+ PageCursorProvider cursorProvider = lookupPageStore(ADDRESS).getCursorProvier();
+
+ PageCursor cursor = cursorProvider.createNonPersistentCursor();
+ PageCursorImpl cursor2 = (PageCursorImpl)cursorProvider.createNonPersistentCursor();
+
+ Pair<PagePosition, PagedMessage> msg;
+
+ int key = 0;
+ while ((msg = cursor.moveNext()) != null)
+ {
+ assertEquals(key++, msg.b.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msg.a);
+ }
+ assertEquals(NUM_MESSAGES, key);
+
+
+ forceGC();
+
+ assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+
+ for (int i = 0 ; i < 10; i++)
+ {
+ msg = cursor2.moveNext();
+ assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
+ }
+
+ assertSame(cursor2.getProvider(), cursorProvider);
+
+ cursor2.close();
+
+ server.stop();
+
+ }
+
+
public void testLeavePageStateAndRestart() throws Exception
{
// Validate the cursor are working fine when all the pages are gone, and then paging being restarted
13 years, 6 months
JBoss hornetq SVN: r9794 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-16 00:00:15 -0400 (Sat, 16 Oct 2010)
New Revision: 9794
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Cleanup on paging
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-16 04:00:15 UTC (rev 9794)
@@ -70,7 +70,20 @@
PageCursorProvider getCursorProvier();
void processReload() throws Exception;
+
+ /**
+ * Remove the first page from the Writing Queue.
+ * The file will still exist until Page.delete is called,
+ * So, case the system is reloaded the same Page will be loaded back if delete is not called.
+ *
+ * @throws Exception
+ *
+ * Note: This should still be part of the interface, even though HornetQ only uses through the
+ */
+ Page depage() throws Exception;
+
+
/**
* @return false if a thread was already started, or if not in page mode
* @throws Exception
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java 2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java 2010-10-16 04:00:15 UTC (rev 9794)
@@ -26,6 +26,8 @@
public interface PageCache
{
Page getPage();
+
+ long getPageId();
int getNumberOfMessages();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-16 04:00:15 UTC (rev 9794)
@@ -15,7 +15,6 @@
import org.hornetq.api.core.Pair;
import org.hornetq.core.paging.PagedMessage;
-import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
/**
@@ -37,6 +36,11 @@
void ack(PagePosition position) throws Exception;
void ackTx(Transaction tx, PagePosition position) throws Exception;
+ /**
+ *
+ * @return the first page in use or MAX_LONG if none is in use
+ */
+ long getFirstPage();
// Reload operations
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-16 04:00:15 UTC (rev 9794)
@@ -16,7 +16,6 @@
import org.hornetq.api.core.Pair;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.server.ServerMessage;
/**
* The provider of Cursor for a given Address
@@ -49,13 +48,13 @@
* @param queueId The cursorID should be the same as the queueId associated for persistance
* @return
*/
- PageCursor getCursor(long queueId);
+ PageCursor getPersistentCursor(long queueId);
/**
* Create a non persistent cursor, usually associated with browsing
* @return
*/
- PageCursor createCursor();
+ PageCursor createNonPersistentCursor();
Pair<PagePosition, PagedMessage> getNext(PageCursor cursor, PagePosition pos) throws Exception;
@@ -65,6 +64,8 @@
void stop();
+ void scheduleCleanup();
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2010-10-16 04:00:15 UTC (rev 9794)
@@ -63,6 +63,11 @@
{
return page;
}
+
+ public long getPageId()
+ {
+ return page.getPageId();
+ }
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCache#getNumberOfMessages()
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-10-16 04:00:15 UTC (rev 9794)
@@ -81,6 +81,11 @@
lock.readLock().unlock();
}
}
+
+ public long getPageId()
+ {
+ return page.getPageId();
+ }
public void lock()
{
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-16 04:00:15 UTC (rev 9794)
@@ -189,8 +189,26 @@
installTXCallback(tx, position);
}
+
/* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#getFirstPage()
+ */
+ public long getFirstPage()
+ {
+ Long firstKey = consumedPages.firstKey();
+ if (firstKey == null)
+ {
+ return Long.MAX_VALUE;
+ }
+ else
+ {
+ return consumedPages.firstKey();
+ }
+ }
+
+
+ /* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
*/
public synchronized void redeliver(final PagePosition position)
@@ -348,6 +366,11 @@
{
if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0)
{
+ if (lastAckedPosition != null && lastAckedPosition.getPageNr() != pos.getPageNr())
+ {
+ // there's a different page being acked, we will do the check right away
+ scheduleCleanupCheck();
+ }
this.lastAckedPosition = pos;
}
PageCursorInfo info = getPageInfo(pos);
@@ -386,6 +409,11 @@
*/
private void onPageDone(final PageCursorInfo info)
{
+ scheduleCleanupCheck();
+ }
+
+ private void scheduleCleanupCheck()
+ {
executor.execute(new Runnable()
{
@@ -458,18 +486,27 @@
@Override
public void afterCommit(final Transaction tx)
{
- synchronized (PageCursorImpl.this)
+ executor.execute(new Runnable()
{
- for (PageCursorInfo completePage : completedPages)
+
+ public void run()
{
- if (isTrace)
+ synchronized (PageCursorImpl.this)
{
- PageCursorImpl.trace("Removing page " + completePage.getPageId());
+ for (PageCursorInfo completePage : completedPages)
+ {
+ if (isTrace)
+ {
+ PageCursorImpl.trace("Removing page " + completePage.getPageId());
+ }
+ System.out.println("Removing page " + completePage.getPageId());
+ consumedPages.remove(completePage.getPageId());
+ }
}
- System.out.println("Removing page " + completePage.getPageId());
- consumedPages.remove(completePage.getPageId());
+
+ cursorProvider.scheduleCleanup();
}
- }
+ });
}
});
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-16 04:00:15 UTC (rev 9794)
@@ -13,8 +13,10 @@
package org.hornetq.core.paging.cursor.impl;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
import org.hornetq.api.core.Pair;
import org.hornetq.core.logging.Logger;
@@ -28,6 +30,8 @@
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.utils.ConcurrentHashSet;
+import org.hornetq.utils.ConcurrentSet;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.SoftValueHashMap;
import org.jboss.netty.util.internal.ConcurrentHashMap;
@@ -51,16 +55,20 @@
// Attributes ----------------------------------------------------
private final PagingStore pagingStore;
-
+
private final PagingManager pagingManager;
private final StorageManager storageManager;
private final ExecutorFactory executorFactory;
+ private final Executor executor;
+
private SoftValueHashMap<Long, PageCache> softCache = new SoftValueHashMap<Long, PageCache>();
private ConcurrentMap<Long, PageCursor> activeCursors = new ConcurrentHashMap<Long, PageCursor>();
+
+ private ConcurrentSet<PageCursor> nonPersistentCursors = new ConcurrentHashSet<PageCursor>();
// Static --------------------------------------------------------
@@ -74,6 +82,7 @@
this.pagingManager = pagingStore.getPagingManager();
this.storageManager = storageManager;
this.executorFactory = executorFactory;
+ this.executor = executorFactory.getExecutor();
}
// Public --------------------------------------------------------
@@ -86,7 +95,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
*/
- public PageCursor getCursor(long cursorID)
+ public PageCursor getPersistentCursor(long cursorID)
{
PageCursor activeCursor = activeCursors.get(cursorID);
if (activeCursor == null)
@@ -105,9 +114,11 @@
/**
* this will create a non-persistent cursor
*/
- public PageCursor createCursor()
+ public PageCursor createNonPersistentCursor()
{
- return new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), 0);
+ PageCursor cursor = new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), 0);
+ nonPersistentCursors.add(cursor);
+ return cursor;
}
/* (non-Javadoc)
@@ -116,16 +127,15 @@
public Pair<PagePosition, PagedMessage> getNext(final PageCursor cursor, PagePosition cursorPos) throws Exception
{
- while(true)
+ while (true)
{
Pair<PagePosition, PagedMessage> retPos = internalAfter(cursorPos);
-
+
if (retPos == null)
{
return null;
}
- else
- if (retPos != null)
+ else if (retPos != null)
{
cursorPos = retPos.a;
if (retPos.b.getTransactionID() != 0)
@@ -133,7 +143,9 @@
PageTransactionInfo tx = pagingManager.getTransaction(retPos.b.getTransactionID());
if (tx == null)
{
- log.warn("Couldn't locate page transaction " + retPos.b.getTransactionID() + ", ignoring message on position " + retPos.a);
+ log.warn("Couldn't locate page transaction " + retPos.b.getTransactionID() +
+ ", ignoring message on position " +
+ retPos.a);
cursor.positionIgnored(cursorPos);
}
else
@@ -151,7 +163,7 @@
}
}
}
-
+
private Pair<PagePosition, PagedMessage> internalAfter(final PagePosition pos)
{
PagePosition retPos = pos.nextMessage();
@@ -174,9 +186,9 @@
return null;
}
}
-
+
PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());
-
+
if (serverMessage != null)
{
return new Pair<PagePosition, PagedMessage>(retPos, cache.getMessage(retPos.getMessageNr()));
@@ -213,11 +225,10 @@
}
return cache;
}
-
+
public synchronized void addPageCache(PageCache cache)
{
- // TODO: remove the type cast here
- softCache.put((long)cache.getPage().getPageId(), cache);
+ softCache.put(cache.getPageId(), cache);
}
public synchronized int getCacheSize()
@@ -233,20 +244,53 @@
}
}
-
public void stop()
{
for (PageCursor cursor : activeCursors.values())
{
cursor.stop();
}
-
+
activeCursors.clear();
}
-
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursorProvider#scheduleCleanup()
+ */
+ public void scheduleCleanup()
+ {
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ cleanup();
+ }
+ });
+ }
+
+ private void cleanup()
+ {
+ long minPage = getMinPageInUse();
+
+ try
+ {
+ System.out.println("MinPage = " + minPage + " firstPage = " + pagingStore.getFirstPage());
+ for (long i = pagingStore.getFirstPage(); i < minPage; i++)
+ {
+ Page page = pagingStore.depage();
+ System.out.println("Deleting files associated with page " + page);
+ page.delete();
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn("Couldn't complete cleanup on paging", e);
+ }
+ }
+
public void printDebug()
{
- for (PageCache cache: softCache.values())
+ for (PageCache cache : softCache.values())
{
System.out.println("Cache " + cache);
}
@@ -264,6 +308,37 @@
// Private -------------------------------------------------------
+ private long getMinPageInUse()
+ {
+ ArrayList<PageCursor> cursorList = new ArrayList<PageCursor>();
+ synchronized (this)
+ {
+ cursorList.addAll(activeCursors.values());
+ cursorList.addAll(nonPersistentCursors);
+ }
+
+ if (cursorList.size() == 0)
+ {
+ return 0l;
+ }
+
+ long minPage = Long.MAX_VALUE;
+
+ System.out.println("CursorList : " + cursorList.size());
+
+ for (PageCursor cursor : cursorList)
+ {
+ long firstPage = cursor.getFirstPage();
+ if (firstPage < minPage)
+ {
+ minPage = firstPage;
+ }
+ }
+
+ return minPage;
+
+ }
+
private PageCache getPageCache(final long pageId)
{
try
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java 2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java 2010-10-16 04:00:15 UTC (rev 9794)
@@ -23,17 +23,6 @@
*/
public interface TestSupportPageStore extends PagingStore
{
- /**
- * Remove the first page from the Writing Queue.
- * The file will still exist until Page.delete is called,
- * So, case the system is reloaded the same Page will be loaded back if delete is not called.
- *
- * @throws Exception
- *
- * Note: This should still be part of the interface, even though HornetQ only uses through the
- */
- Page depage() throws Exception;
-
void forceAnotherPage() throws Exception;
/** @return true if paging was started, or false if paging was already started before this call */
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-16 04:00:15 UTC (rev 9794)
@@ -1028,7 +1028,7 @@
{
SimpleString address = queueInfo.getAddress();
PagingStore store = pagingManager.getPageStore(address);
- PageCursor cursor = store.getCursorProvier().getCursor(encoding.queueID);
+ PageCursor cursor = store.getCursorProvier().getPersistentCursor(encoding.queueID);
cursor.reloadACK(encoding.position);
}
else
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-15 20:18:59 UTC (rev 9793)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-16 04:00:15 UTC (rev 9794)
@@ -106,7 +106,7 @@
public void testSimpleCursor() throws Exception
{
- final int NUM_MESSAGES = 1000;
+ final int NUM_MESSAGES = 100;
int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
@@ -114,7 +114,7 @@
PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(), server.getExecutorFactory());
- PageCursor cursor = cursorProvider.createCursor();
+ PageCursor cursor = cursorProvider.createNonPersistentCursor();
Pair<PagePosition, PagedMessage> msg;
@@ -162,7 +162,7 @@
PageCursorProviderImpl cursorProvider = (PageCursorProviderImpl)this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
- PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+ PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
PageCache firstPage = cursorProvider.getPageCache(new PagePositionImpl(server.getPagingManager().getPageStore(ADDRESS).getFirstPage(), 0));
@@ -194,7 +194,7 @@
server.start();
- cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+ cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
for (int i = firstPageSize; i < NUM_MESSAGES; i++)
{
@@ -227,7 +227,7 @@
PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
- PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+ PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
System.out.println("Cursor: " + cursor);
for (int i = 0 ; i < 100 ; i++)
@@ -246,7 +246,7 @@
server.start();
- cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+ cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
for (int i = 10; i <= 20; i++)
{
@@ -277,7 +277,7 @@
PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
- PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+ PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
System.out.println("Cursor: " + cursor);
@@ -300,7 +300,7 @@
server.start();
- cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+ cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
@@ -336,7 +336,7 @@
PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
- PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+ PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getPersistentCursor(queue.getID());
System.out.println("Cursor: " + cursor);
@@ -382,7 +382,7 @@
PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
- PageCursor cursor = pageStore.getCursorProvier().getCursor(queue.getID());
+ PageCursor cursor = pageStore.getCursorProvier().getPersistentCursor(queue.getID());
System.out.println("Cursor: " + cursor);
@@ -465,21 +465,6 @@
pageStore.page(messages, pgParameter.getTransactionID());
}
- public void testRollbackScenariosOnACK() throws Exception
- {
-
- }
-
- public void testReadRolledBackData() throws Exception
- {
-
- }
-
- public void testRedeliveryScenarios() throws Exception
- {
-
- }
-
public void testCleanupScenarios() throws Exception
{
// Validate the pages are being cleared (with multiple cursors)
13 years, 6 months