JBoss hornetq SVN: r10071 - in trunk: src/main/org/hornetq/core/protocol/core/impl and 4 other directories.
by do-not-reply@jboss.org
Author: igarashitm
Date: 2010-12-22 21:11:49 -0500 (Wed, 22 Dec 2010)
New Revision: 10071
Modified:
trunk/src/config/common/hornetq-version.properties
trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
trunk/src/main/org/hornetq/core/version/Version.java
trunk/src/main/org/hornetq/core/version/impl/VersionImpl.java
trunk/src/main/org/hornetq/utils/VersionLoader.java
trunk/tests/src/org/hornetq/tests/unit/core/version/impl/VersionImplTest.java
Log:
https://issues.jboss.org/browse/HORNETQ-445
added version compatibility matrix validation
Modified: trunk/src/config/common/hornetq-version.properties
===================================================================
--- trunk/src/config/common/hornetq-version.properties 2010-12-22 23:11:40 UTC (rev 10070)
+++ trunk/src/config/common/hornetq-version.properties 2010-12-23 02:11:49 UTC (rev 10071)
@@ -6,3 +6,4 @@
hornetq.version.versionSuffix=CR1
hornetq.version.versionTag=CR1
hornetq.netty.version=(a)NETTY.VERSION@
+hornetq.version.compatibleVersionList=100-1000
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2010-12-22 23:11:40 UTC (rev 10070)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2010-12-23 02:11:49 UTC (rev 10071)
@@ -127,8 +127,18 @@
try
{
Version version = server.getVersion();
+ int[] compatibleList = version.getCompatibleVersionList();
+ boolean isCompatibleClient = false;
+ for(int i=0; i<compatibleList.length; i++)
+ {
+ if(compatibleList[i] == request.getVersion())
+ {
+ isCompatibleClient = true;
+ break;
+ }
+ }
- if (version.getIncrementingVersion() != request.getVersion())
+ if (!isCompatibleClient)
{
log.warn("Client with version " + request.getVersion() +
" and address " +
Modified: trunk/src/main/org/hornetq/core/version/Version.java
===================================================================
--- trunk/src/main/org/hornetq/core/version/Version.java 2010-12-22 23:11:40 UTC (rev 10070)
+++ trunk/src/main/org/hornetq/core/version/Version.java 2010-12-23 02:11:49 UTC (rev 10071)
@@ -35,6 +35,8 @@
String getVersionSuffix();
int getIncrementingVersion();
+
+ int[] getCompatibleVersionList();
String getNettyVersion();
}
Modified: trunk/src/main/org/hornetq/core/version/impl/VersionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/version/impl/VersionImpl.java 2010-12-22 23:11:40 UTC (rev 10070)
+++ trunk/src/main/org/hornetq/core/version/impl/VersionImpl.java 2010-12-23 02:11:49 UTC (rev 10071)
@@ -46,9 +46,11 @@
private final int incrementingVersion;
private final String versionSuffix;
-
+
private final String nettyVersion;
+ private final int[] compatibleVersionList;
+
// Constructors --------------------------------------------------
public VersionImpl(final String versionName,
@@ -57,7 +59,8 @@
final int microVersion,
final int incrementingVersion,
final String versionSuffix,
- final String nettyVersion)
+ final String nettyVersion,
+ final int[] compatibleVersionList)
{
this.versionName = versionName;
@@ -72,6 +75,8 @@
this.versionSuffix = versionSuffix;
this.nettyVersion = nettyVersion;
+
+ this.compatibleVersionList = compatibleVersionList;
}
// Version implementation ------------------------------------------
@@ -126,6 +131,11 @@
return nettyVersion;
}
+ public int[] getCompatibleVersionList()
+ {
+ return compatibleVersionList;
+ }
+
// Public -------------------------------------------------------
@Override
Modified: trunk/src/main/org/hornetq/utils/VersionLoader.java
===================================================================
--- trunk/src/main/org/hornetq/utils/VersionLoader.java 2010-12-22 23:11:40 UTC (rev 10070)
+++ trunk/src/main/org/hornetq/utils/VersionLoader.java 2010-12-23 02:11:49 UTC (rev 10071)
@@ -16,6 +16,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
+import java.util.StringTokenizer;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.version.Version;
@@ -81,13 +82,16 @@
int incrementingVersion = Integer.valueOf(versionProps.getProperty("hornetq.version.incrementingVersion"));
String versionSuffix = versionProps.getProperty("hornetq.version.versionSuffix");
String nettyVersion = versionProps.getProperty("hornetq.netty.version");
+ int[] compatibleVersionArray = parseCompatibleVersionList(versionProps.getProperty("hornetq.version.compatibleVersionList"));
+
return new VersionImpl(versionName,
majorVersion,
minorVersion,
microVersion,
incrementingVersion,
versionSuffix,
- nettyVersion);
+ nettyVersion,
+ compatibleVersionArray);
}
catch (IOException e)
{
@@ -108,4 +112,68 @@
}
}
+
+ private static int[] parseCompatibleVersionList(String property) throws IOException
+ {
+ int[] verArray = new int[0];
+ StringTokenizer tokenizer = new StringTokenizer(property,",");
+ while(tokenizer.hasMoreTokens())
+ {
+ int from = -1, to = -1;
+ String token = tokenizer.nextToken();
+
+ int cursor = 0;
+ char firstChar = token.charAt(0);
+ if(firstChar == '-')
+ {
+ // "-n" pattern
+ from = 0;
+ cursor++;
+ for(;cursor < token.length() && Character.isDigit(token.charAt(cursor)); cursor++);
+ if(cursor > 1)
+ {
+ to = Integer.parseInt(token.substring(1, cursor));
+ }
+ }
+ else if(Character.isDigit(firstChar))
+ {
+ for(;cursor < token.length() && Character.isDigit(token.charAt(cursor)); cursor++);
+ from = Integer.parseInt(token.substring(0, cursor));
+
+ if(cursor == token.length())
+ {
+ // just "n" pattern
+ to = from;
+ }
+ else if(token.charAt(cursor)== '-')
+ {
+ cursor++;
+ if(cursor == token.length())
+ {
+ // "n-" pattern
+ to = Integer.MAX_VALUE;
+ }
+ else
+ {
+ // "n-n" pattern
+ to = Integer.parseInt(token.substring(cursor));
+ }
+ }
+ }
+
+ if(from != -1 && to != -1)
+ {
+ // merge version array
+ int[] newArray = new int[verArray.length + to-from+1];
+ System.arraycopy(verArray, 0, newArray, 0, verArray.length);
+ for(int i=0; i<to-from+1; i++)
+ {
+ newArray[verArray.length+i] = from + i;
+ }
+ verArray = newArray;
+ }
+ }
+
+ return verArray;
+ }
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/version/impl/VersionImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/version/impl/VersionImplTest.java 2010-12-22 23:11:40 UTC (rev 10070)
+++ trunk/tests/src/org/hornetq/tests/unit/core/version/impl/VersionImplTest.java 2010-12-23 02:11:49 UTC (rev 10071)
@@ -51,13 +51,15 @@
int incrementingVersion = 10;
String versionSuffix = "suffix";
String nettyVersion = "netty";
+ int[] compatibleVersionList = {7,8,9,10};
VersionImpl version = new VersionImpl(versionName,
majorVersion,
minorVersion,
microVersion,
incrementingVersion,
versionSuffix,
- nettyVersion);
+ nettyVersion,
+ compatibleVersionList);
Assert.assertEquals(versionName, version.getVersionName());
Assert.assertEquals(majorVersion, version.getMajorVersion());
@@ -70,9 +72,9 @@
public void testEquals() throws Exception
{
String nettyVersion = "netty";
- VersionImpl version = new VersionImpl("HORNETQ", 2, 0, 1, 10, "suffix", nettyVersion);
- VersionImpl sameVersion = new VersionImpl("HORNETQ", 2, 0, 1, 10, "suffix", nettyVersion);
- VersionImpl differentVersion = new VersionImpl("HORNETQ", 2, 0, 1, 11, "suffix", nettyVersion);
+ VersionImpl version = new VersionImpl("HORNETQ", 2, 0, 1, 10, "suffix", nettyVersion, new int[]{7,8,9,10});
+ VersionImpl sameVersion = new VersionImpl("HORNETQ", 2, 0, 1, 10, "suffix", nettyVersion, new int[]{7,8,9,10});
+ VersionImpl differentVersion = new VersionImpl("HORNETQ", 2, 0, 1, 11, "suffix", nettyVersion, new int[]{7,8,9,10,11});
Assert.assertFalse(version.equals(new Object()));
@@ -84,7 +86,7 @@
public void testSerialize() throws Exception
{
String nettyVersion = "netty";
- VersionImpl version = new VersionImpl("uyiuy", 3, 7, 6, 12, "uhuhuh", nettyVersion);
+ VersionImpl version = new VersionImpl("uyiuy", 3, 7, 6, 12, "uhuhuh", nettyVersion, new int[]{9,10,11,12});
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(version);
15 years, 5 months
JBoss hornetq SVN: r10070 - in trunk: src/main/org/hornetq/core/paging/cursor/impl and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-22 18:11:40 -0500 (Wed, 22 Dec 2010)
New Revision: 10070
Modified:
trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Paging changes
Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-12-22 20:03:31 UTC (rev 10069)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-12-22 23:11:40 UTC (rev 10070)
@@ -36,6 +36,8 @@
void bookmark(PagePosition position) throws Exception;
PageSubscriptionCounter getCounter();
+
+ long getMessageCount();
long getId();
Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-12-22 20:03:31 UTC (rev 10069)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-12-22 23:11:40 UTC (rev 10070)
@@ -28,6 +28,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.IOAsyncTask;
@@ -101,6 +102,8 @@
private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
private final PageSubscriptionCounter counter;
+
+ private final AtomicLong deliveredCount = new AtomicLong(0);
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
@@ -171,6 +174,13 @@
confirmPosition(position);
}
+
+
+ public long getMessageCount()
+ {
+ return counter.getValue() - deliveredCount.get();
+ }
+
public PageSubscriptionCounter getCounter()
{
return counter;
@@ -959,6 +969,7 @@
for (PagePosition confirmed : positions)
{
cursor.processACK(confirmed);
+ cursor.deliveredCount.decrementAndGet();
}
}
@@ -1195,6 +1206,7 @@
{
if (!isredelivery)
{
+ deliveredCount.incrementAndGet();
PageSubscriptionImpl.this.getPageInfo(position).remove(position);
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-12-22 20:03:31 UTC (rev 10069)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-12-22 23:11:40 UTC (rev 10070)
@@ -661,7 +661,7 @@
{
if (pageSubscription != null)
{
- return messageReferences.size() + getScheduledCount() + getDeliveringCount() + pageSubscription.getCounter().getValue();
+ return messageReferences.size() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
}
else
{
@@ -1639,6 +1639,11 @@
}
queue.deliveringCount.decrementAndGet();
+
+ if (queue.deliveringCount.get() < 0)
+ {
+ new Exception("DeliveringCount became negative").printStackTrace();
+ }
try
{
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-12-22 20:03:31 UTC (rev 10069)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-12-22 23:11:40 UTC (rev 10070)
@@ -236,6 +236,8 @@
assertNull(consumer.receiveImmediate());
sessionCheck.close();
+
+ assertEquals(numberOfMessages, queue.getMessageCount());
sf.close();
locator.close();
@@ -277,6 +279,7 @@
locator.close();
+ queue.getMessageCount();
//assertEquals(numberOfMessages, queue.getMessageCount());
}
finally
15 years, 5 months
JBoss hornetq SVN: r10069 - trunk/src/main/org/hornetq/core/paging/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-22 15:03:31 -0500 (Wed, 22 Dec 2010)
New Revision: 10069
Modified:
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
Log:
removing mistakenly added System.outs
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-12-22 20:03:08 UTC (rev 10068)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-12-22 20:03:31 UTC (rev 10069)
@@ -896,7 +896,6 @@
{
if (sync)
{
- System.out.println("Doing a sync on page");
sync();
}
}
15 years, 5 months
JBoss hornetq SVN: r10068 - trunk/src/main/org/hornetq/core/paging/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-22 15:03:08 -0500 (Wed, 22 Dec 2010)
New Revision: 10068
Modified:
trunk/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java
Log:
removing mistakenly added System.outs
Modified: trunk/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java 2010-12-22 19:30:59 UTC (rev 10067)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java 2010-12-22 20:03:08 UTC (rev 10068)
@@ -80,7 +80,6 @@
private void tick()
{
- System.out.println("Tick on PageSynctimer");
OperationContext [] pendingSyncsArray;
synchronized (this)
{
@@ -92,9 +91,7 @@
try
{
- System.out.println("will perform a sync");
store.ioSync();
- System.out.println("done with the sync");
}
catch (Exception e)
{
15 years, 5 months
JBoss hornetq SVN: r10067 - in trunk: src/main/org/hornetq/core/paging/cursor and 13 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-22 14:30:59 -0500 (Wed, 22 Dec 2010)
New Revision: 10067
Added:
trunk/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java
Modified:
trunk/src/main/org/hornetq/core/paging/PagingManager.java
trunk/src/main/org/hornetq/core/paging/PagingStore.java
trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/persistence/OperationContext.java
trunk/src/main/org/hornetq/core/persistence/StorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java
Log:
Changes on paging
Modified: trunk/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingManager.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/PagingManager.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -52,9 +52,6 @@
/** An injection point for the PostOffice to inject itself */
void setPostOffice(PostOffice postOffice);
- /** Used to start depaging every paged address, after a reload/restart */
- void resumeDepages() throws Exception;
-
/**
* Point to inform/restoring Transactions used when the messages were added into paging
* */
Modified: trunk/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingStore.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/PagingStore.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -57,7 +57,11 @@
boolean isPaging();
+ // It will schedule sync to the file storage
void sync() throws Exception;
+
+ // It will perform a real sync on the current IO file
+ void ioSync() throws Exception;
boolean page(ServerMessage message, RoutingContext ctx) throws Exception;
@@ -88,12 +92,6 @@
Page getCurrentPage();
- /**
- * @return false if a thread was already started, or if not in page mode
- * @throws Exception
- */
- boolean startDepaging();
-
/** @return true if paging was started, or false if paging was already started before this call */
boolean startPaging() throws Exception;
Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -33,7 +33,6 @@
// To be called before the server is down
void stop();
- // TODO: this method is only used on testcases and can go away
void bookmark(PagePosition position) throws Exception;
PageSubscriptionCounter getCounter();
Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -33,7 +33,7 @@
void loadInc(final long recordInd, final int add);
- void replayIncrement(Transaction tx, long recordID, int add);
+ void applyIncrement(Transaction tx, long recordID, int add);
/** This will process the reload */
void processReload();
Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -100,20 +100,31 @@
*/
public void increment(Transaction tx, int add) throws Exception
{
-
- if (persistent)
+ if (tx == null)
{
- tx.setContainsPersistent();
- long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
- replayIncrement(tx, id, add);
+ if (persistent)
+ {
+ long id = storage.storePageCounterInc(this.subscriptionID, add);
+ incrementProcessed(id, add);
+ }
+ else
+ {
+ incrementProcessed(-1, add);
+ }
}
else
{
- replayIncrement(tx, -1, add);
+ if (persistent)
+ {
+ tx.setContainsPersistent();
+ long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
+ applyIncrement(tx, id, add);
+ }
+ else
+ {
+ applyIncrement(tx, -1, add);
+ }
}
-
-
-
}
/**
@@ -122,7 +133,7 @@
* @param recordID
* @param add
*/
- public void replayIncrement(Transaction tx, long recordID, int add)
+ public void applyIncrement(Transaction tx, long recordID, int add)
{
CounterOperations oper = (CounterOperations)tx.getProperty(TransactionPropertyIndexes.PAGE_COUNT_INC);
@@ -194,13 +205,13 @@
public void addInc(long id, int variance)
{
value.addAndGet(variance);
-
+
if (id >= 0)
{
incrementRecords.add(id);
}
}
-
+
/** used on testing only */
public void setPersistent(final boolean persistent)
{
@@ -215,6 +226,10 @@
long valueReplace;
synchronized (this)
{
+ if (incrementRecords.size() <= FLUSH_COUNTER)
+ {
+ return;
+ }
valueReplace = value.get();
deleteList = new ArrayList<Long>(incrementRecords.size());
deleteList.addAll(incrementRecords);
@@ -242,7 +257,7 @@
storage.commit(txCleanup);
storage.waitOnOperations();
- }
+ }
catch (Exception e)
{
newRecordID = recordID;
Added: trunk/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java (rev 0)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.impl;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.persistence.OperationContext;
+
+/**
+ * This will batch multiple calls waiting to perform a sync in a single call
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class PageSyncTimer
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final PagingStore store;
+
+ private final ScheduledExecutorService scheduledExecutor;
+
+ private boolean pendingSync;
+
+ private final long timeSync;
+
+ private final Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ tick();
+ }
+ };
+
+ private List<OperationContext> syncOperations = new LinkedList<OperationContext>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public PageSyncTimer(PagingStore store, ScheduledExecutorService scheduledExecutor, long timeSync)
+ {
+ this.store = store;
+ this.scheduledExecutor = scheduledExecutor;
+ this.timeSync = timeSync;
+ }
+
+ // Public --------------------------------------------------------
+
+ public synchronized void addSync(OperationContext ctx)
+ {
+ ctx.pageSyncLineUp();
+ if (!pendingSync)
+ {
+ pendingSync = true;
+ scheduledExecutor.schedule(runnable, timeSync, TimeUnit.NANOSECONDS);
+ }
+ syncOperations.add(ctx);
+ }
+
+ private void tick()
+ {
+ System.out.println("Tick on PageSynctimer");
+ OperationContext [] pendingSyncsArray;
+ synchronized (this)
+ {
+ pendingSync = false;
+ pendingSyncsArray = new OperationContext[syncOperations.size()];
+ pendingSyncsArray = syncOperations.toArray(pendingSyncsArray);
+ syncOperations.clear();
+ }
+
+ try
+ {
+ System.out.println("will perform a sync");
+ store.ioSync();
+ System.out.println("done with the sync");
+ }
+ catch (Exception e)
+ {
+ for (OperationContext ctx : pendingSyncsArray)
+ {
+ ctx.onError(HornetQException.IO_ERROR, e.getMessage());
+ }
+ }
+ finally
+ {
+ // In case of failure, The context should propage an exception to the client
+ // We send an exception to the client even on the case of a failure
+ // to avoid possible locks and the client not getting the exception back
+ for (OperationContext ctx : pendingSyncsArray)
+ {
+ ctx.pageSyncDone();
+ }
+ }
+ }
+
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -149,6 +149,7 @@
{
if (lateDeliveries != null)
{
+ // This is to make sure deliveries that were touched before the commit arrived will be delivered
for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
{
pos.a.redeliver(pos.b);
@@ -164,7 +165,9 @@
storageManager.storePageTransaction(tx.getID(), this);
}
- /* (non-Javadoc)
+ /*
+ * This is to be used after paging. We will update the PageTransactions until they get all the messages delivered. On that case we will delete the page TX
+ * (non-Javadoc)
* @see org.hornetq.core.paging.PageTransactionInfo#storeUpdate(org.hornetq.core.persistence.StorageManager, org.hornetq.core.transaction.Transaction, int)
*/
public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx) throws Exception
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -207,28 +207,6 @@
pagingStoreFactory.stop();
}
- public void resumeDepages()
- {
- if (!started)
- {
- // If stop the server while depaging, the server may call a rollback,
- // the rollback may addSizes back and that would fire a globalDepage.
- // Because of that we must ignore any startGlobalDepage calls,
- // and this check needs to be done outside of the lock
- return;
- }
- synchronized (this)
- {
- for (PagingStore store : stores.values())
- {
- if (store.isPaging())
- {
- store.startDepaging();
- }
- }
- }
- }
-
public void processReload() throws Exception
{
for (PagingStore store: stores.values())
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -61,6 +62,10 @@
protected final boolean syncNonTransactional;
private PagingManager pagingManager;
+
+ private final ScheduledExecutorService scheduledExecutor;
+
+ private final long syncTimeout;
private StorageManager storageManager;
@@ -71,6 +76,8 @@
// Constructors --------------------------------------------------
public PagingStoreFactoryNIO(final String directory,
+ final long syncTimeout,
+ final ScheduledExecutorService scheduledExecutor,
final ExecutorFactory executorFactory,
final boolean syncNonTransactional)
{
@@ -79,6 +86,10 @@
this.executorFactory = executorFactory;
this.syncNonTransactional = syncNonTransactional;
+
+ this.scheduledExecutor = scheduledExecutor;
+
+ this.syncTimeout = syncTimeout;
}
// Public --------------------------------------------------------
@@ -91,6 +102,8 @@
{
return new PagingStoreImpl(address,
+ scheduledExecutor,
+ syncTimeout,
pagingManager,
storageManager,
postOffice,
@@ -195,6 +208,8 @@
AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
PagingStore store = new PagingStoreImpl(address,
+ scheduledExecutor,
+ syncTimeout,
pagingManager,
storageManager,
postOffice,
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -15,10 +15,14 @@
import java.text.DecimalFormat;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -39,7 +43,9 @@
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.impl.LivePageCacheImpl;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.MessageReference;
@@ -50,6 +56,7 @@
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.Transaction.State;
+import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
@@ -89,6 +96,9 @@
private final PagingStoreFactory storeFactory;
+ // Used to schedule sync threads
+ private final PageSyncTimer syncTimer;
+
private final long maxSize;
private final long pageSize;
@@ -113,7 +123,7 @@
private volatile int currentPageId;
private volatile Page currentPage;
-
+
private volatile boolean paging = false;
/** duplicate cache used at this address */
@@ -142,6 +152,8 @@
// Constructors --------------------------------------------------
public PagingStoreImpl(final SimpleString address,
+ final ScheduledExecutorService scheduledExecutor,
+ final long syncTimeout,
final PagingManager pagingManager,
final StorageManager storageManager,
final PostOffice postOffice,
@@ -191,6 +203,15 @@
this.syncNonTransactional = syncNonTransactional;
+ if (scheduledExecutor != null)
+ {
+ this.syncTimer = new PageSyncTimer(this, scheduledExecutor, syncTimeout);
+ }
+ else
+ {
+ this.syncTimer = null;
+ }
+
this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executorFactory);
// Post office could be null on the backup node
@@ -303,7 +324,7 @@
{
return storeName;
}
-
+
public boolean page(final ServerMessage message, final RoutingContext ctx) throws Exception
{
return page(message, ctx, ctx.getContextListing(storeName));
@@ -311,73 +332,37 @@
public boolean page(final ServerMessage message, final RoutingContext ctx, RouteContextList listCtx) throws Exception
{
- // The sync on transactions is done on commit only
- // TODO: sync on paging
- return page(message, ctx, listCtx, false);
+ return page(message, ctx, listCtx, syncNonTransactional && ctx.getTransaction() == null);
}
public void sync() throws Exception
{
- lock.readLock().lock();
-
- try
+ if (syncTimer != null)
{
- if (currentPage != null)
- {
- currentPage.sync();
- }
+ syncTimer.addSync(storageManager.getContext());
}
- finally
+ else
{
- lock.readLock().unlock();
+ ioSync();
}
+
}
- public boolean startDepaging()
+ public void ioSync() throws Exception
{
+ lock.readLock().lock();
- // Disabled for now
-
- return false;
-
- /*
- if (!running)
- {
- return false;
- }
-
- currentPageLock.readLock().lock();
try
{
- if (currentPage == null)
+ if (currentPage != null)
{
- return false;
+ currentPage.sync();
}
- else
- {
- // startDepaging and clearDepage needs to be atomic.
- // We can't use writeLock to this operation as writeLock would still be used by another thread, and still
- // being a valid usage
- synchronized (this)
- {
- if (!depaging.get())
- {
- depaging.set(true);
- Runnable depageAction = new DepageRunnable(executor);
- executor.execute(depageAction);
- return true;
- }
- else
- {
- return false;
- }
- }
- }
}
finally
{
- currentPageLock.readLock().unlock();
- } */
+ lock.readLock().unlock();
+ }
}
public void processReload() throws Exception
@@ -415,11 +400,11 @@
}
}
}
-
+
public void flushExecutors()
{
cursorProvider.flushExecutors();
-
+
Future future = new Future();
executor.execute(future);
@@ -498,7 +483,7 @@
cursorProvider.addPageCache(pageCache);
}
-
+
// We will not mark it for paging if there's only a single empty file
if (currentPage != null && !(numberOfPages == 1 && currentPage.getSize() == 0))
{
@@ -513,7 +498,7 @@
lock.writeLock().unlock();
}
}
-
+
public void stopPaging()
{
lock.writeLock().lock();
@@ -551,7 +536,7 @@
{
return false;
}
-
+
if (currentPage == null)
{
try
@@ -568,7 +553,7 @@
}
paging = true;
-
+
return true;
}
finally
@@ -805,19 +790,6 @@
}
}
}
- else
- {
- if (maxSize > 0 && currentPage != null && addressSize <= maxSize - pageSize && !depaging.get())
- {
- if (startDepaging())
- {
- if (PagingStoreImpl.isTrace)
- {
- PagingStoreImpl.trace("Starting depaging Thread, size = " + addressSize);
- }
- }
- }
- }
return;
}
@@ -878,8 +850,6 @@
}
Transaction tx = ctx.getTransaction();
-
- boolean startedTx = false;
lock.writeLock().lock();
@@ -889,12 +859,6 @@
{
return false;
}
-
- if (tx == null)
- {
- tx = new TransactionImpl(storageManager);
- startedTx = true;
- }
PagedMessage pagedMessage;
@@ -905,7 +869,7 @@
message.bodyChanged();
}
- pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), getTransactionID(tx, listCtx));
+ pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), installPageTransaction(tx, listCtx));
int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
@@ -917,15 +881,31 @@
currentPage.write(pagedMessage);
+ if (tx != null)
+ {
+ SyncPageStoreTX syncPage = (SyncPageStoreTX)tx.getProperty(TransactionPropertyIndexes.PAGE_SYNC);
+ if (syncPage == null)
+ {
+ syncPage = new SyncPageStoreTX();
+ tx.putProperty(TransactionPropertyIndexes.PAGE_SYNC, syncPage);
+ tx.addOperation(syncPage);
+ }
+ syncPage.addStore(this);
+ }
+ else
+ {
+ if (sync)
+ {
+ System.out.println("Doing a sync on page");
+ sync();
+ }
+ }
+
return true;
}
finally
{
lock.writeLock().unlock();
- if (startedTx)
- {
- tx.commit();
- }
}
}
@@ -934,7 +914,7 @@
{
List<org.hornetq.core.server.Queue> durableQueues = ctx.getDurableQueues();
List<org.hornetq.core.server.Queue> nonDurableQueues = ctx.getNonDurableQueues();
- long ids[] = new long [durableQueues.size() + nonDurableQueues.size()];
+ long ids[] = new long[durableQueues.size() + nonDurableQueues.size()];
int i = 0;
for (org.hornetq.core.server.Queue q : durableQueues)
@@ -942,7 +922,7 @@
q.getPageSubscription().getCounter().increment(tx, 1);
ids[i++] = q.getID();
}
-
+
for (org.hornetq.core.server.Queue q : nonDurableQueues)
{
q.getPageSubscription().getCounter().increment(tx, 1);
@@ -950,8 +930,8 @@
}
return ids;
}
-
- private long getTransactionID(final Transaction tx, final RouteContextList listCtx) throws Exception
+
+ private long installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception
{
if (tx == null)
{
@@ -959,7 +939,7 @@
}
else
{
- PageTransactionInfo pgTX = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+ PageTransactionInfo pgTX = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
if (pgTX == null)
{
pgTX = new PageTransactionInfoImpl(tx.getID());
@@ -967,25 +947,81 @@
tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgTX);
tx.addOperation(new FinishPageMessageOperation(pgTX));
}
-
+
pgTX.increment(listCtx.getNumberOfQueues());
-
+
return tx.getID();
}
}
-
+ private static class SyncPageStoreTX extends TransactionOperationAbstract
+ {
+ Set<PagingStore> storesToSync = new HashSet<PagingStore>();
+
+ public void addStore(PagingStore store)
+ {
+ storesToSync.add(store);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforePrepare(Transaction tx) throws Exception
+ {
+ sync();
+ }
+
+ void sync() throws Exception
+ {
+ OperationContext originalTX = OperationContextImpl.getContext();
+
+ try
+ {
+ // We only want to sync paging here, no need to wait for any other events
+ OperationContextImpl.clearContext();
+
+ for (PagingStore store : storesToSync)
+ {
+ store.sync();
+ }
+
+ // We can't perform a commit/sync on the journal before we can assure page files are synced or we may get
+ // out of sync
+ OperationContext ctx = OperationContextImpl.getContext();
+
+ if (ctx != null)
+ {
+ // if null it means there were no operations done before, hence no need to wait any completions
+ ctx.waitCompletion();
+ }
+ }
+ finally
+ {
+ OperationContextImpl.setContext(originalTX);
+ }
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforeCommit(Transaction tx) throws Exception
+ {
+ sync();
+ }
+ }
+
private class FinishPageMessageOperation implements TransactionOperation
{
private final PageTransactionInfo pageTransaction;
-
+
private boolean stored = false;
public FinishPageMessageOperation(final PageTransactionInfo pageTransaction)
{
this.pageTransaction = pageTransaction;
}
-
+
public void afterCommit(final Transaction tx)
{
// If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
@@ -1019,7 +1055,7 @@
{
storePageTX(tx);
}
-
+
private void storePageTX(final Transaction tx) throws Exception
{
if (!stored)
@@ -1044,24 +1080,6 @@
}
- /**
- * This method will remove files from the page system and and route them, doing it transactionally
- *
- * If persistent messages are also used, it will update eventual PageTransactions
- */
-
- /**
- * @param pageId
- * @return
- */
- private byte[] generateDuplicateID(final int pageId)
- {
- byte duplicateIdForPage[] = new SimpleString("page-" + pageId).getData();
- return duplicateIdForPage;
- }
-
-
-
private void openNewPage() throws Exception
{
lock.writeLock().lock();
@@ -1126,40 +1144,4 @@
}
// Inner classes -------------------------------------------------
-
- /* private class DepageRunnable implements Runnable
- {
- private final Executor followingExecutor;
-
- public DepageRunnable(final Executor followingExecutor)
- {
- this.followingExecutor = followingExecutor;
- }
-
- public void run()
- {
- try
- {
- if (running)
- {
- if (!isAddressFull(getPageSizeBytes()))
- {
- readPage();
- }
-
- // Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed
- // however clearDepage shouldn't be executed if the page-store is being stopped, as stop will be holding
- // the lock and this would dead lock
- if (running && !clearDepage())
- {
- followingExecutor.execute(this);
- }
- }
- }
- catch (Throwable e)
- {
- PagingStoreImpl.log.error(e, e);
- }
- }
- } */
}
Modified: trunk/src/main/org/hornetq/core/persistence/OperationContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/OperationContext.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/persistence/OperationContext.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -34,6 +34,10 @@
void replicationLineUp();
void replicationDone();
+
+ void pageSyncLineUp();
+
+ void pageSyncDone();
void waitCompletion() throws Exception;
Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -205,5 +205,11 @@
*/
long storePageCounterInc(long txID, long queueID, int add) throws Exception;
+ /**
+ * @return the ID with the increment record
+ * @throws Exception
+ */
+ long storePageCounterInc(long queueID, int add) throws Exception;
+
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -1059,7 +1059,6 @@
encoding.decode(buff);
-
PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
if (sub != null)
@@ -1126,9 +1125,16 @@
}
}
- loadPreparedTransactions(postOffice, pagingManager, resourceManager, queues, queueInfos, preparedTransactions, duplicateIDMap, pageSubscriptions);
-
- for (PageSubscription sub: pageSubscriptions.values())
+ loadPreparedTransactions(postOffice,
+ pagingManager,
+ resourceManager,
+ queues,
+ queueInfos,
+ preparedTransactions,
+ duplicateIDMap,
+ pageSubscriptions);
+
+ for (PageSubscription sub : pageSubscriptions.values())
{
sub.getCounter().processReload();
}
@@ -1205,7 +1211,7 @@
pageSubscriptions.put(queueID, subs);
}
}
-
+
return subs;
}
@@ -1262,6 +1268,20 @@
}
/* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storePageCounterAdd(long, long, int)
+ */
+ public long storePageCounterInc(long queueID, int value) throws Exception
+ {
+ long recordID = idGenerator.generateID();
+ messageJournal.appendAddRecord(recordID,
+ JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
+ new PageCountRecordInc(queueID, value),
+ true,
+ getContext());
+ return recordID;
+ }
+
+ /* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long, long)
*/
public long storePageCounter(long txID, long queueID, long value) throws Exception
@@ -1685,8 +1705,21 @@
}
case ACKNOWLEDGE_CURSOR:
{
- // TODO: implement and test this case
- // and make sure the rollback will work well also
+ CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
+ encoding.decode(buff);
+
+ encoding.position.setRecordID(record.id);
+
+ PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+
+ if (sub != null)
+ {
+ sub.reloadPreparedACK(tx, encoding.position);
+ }
+ else
+ {
+ log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+ }
break;
}
case PAGE_CURSOR_COUNTER_VALUE:
@@ -1702,12 +1735,14 @@
encoding.decode(buff);
+ PageSubscription sub = locateSubscription(encoding.queueID,
+ pageSubscriptions,
+ queueInfos,
+ pagingManager);
- PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
-
if (sub != null)
{
- sub.getCounter().replayIncrement(tx, record.id, encoding.value);
+ sub.getCounter().applyIncrement(tx, record.id, encoding.value);
}
else
{
@@ -1872,6 +1907,20 @@
return true;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.OperationContext#pageLineUp()
+ */
+ public void pageSyncLineUp()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.OperationContext#pageDone()
+ */
+ public void pageSyncDone()
+ {
+ }
+
}
private static class XidEncoding implements EncodingSupport
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -49,14 +49,26 @@
{
OperationContextImpl.threadLocalContext.set(null);
}
-
+
+ public static OperationContext getContext()
+ {
+ return getContext(null);
+ }
+
public static OperationContext getContext(final ExecutorFactory executorFactory)
{
OperationContext token = OperationContextImpl.threadLocalContext.get();
if (token == null)
{
- token = new OperationContextImpl(executorFactory.getExecutor());
- OperationContextImpl.threadLocalContext.set(token);
+ if (executorFactory == null)
+ {
+ return null;
+ }
+ else
+ {
+ token = new OperationContextImpl(executorFactory.getExecutor());
+ OperationContextImpl.threadLocalContext.set(token);
+ }
}
return token;
}
@@ -68,17 +80,23 @@
private List<TaskHolder> tasks;
+ private int minimalStore = Integer.MAX_VALUE;
+
+ private int minimalReplicated = Integer.MAX_VALUE;
+
+ private int minimalPage = Integer.MAX_VALUE;
+
private volatile int storeLineUp = 0;
private volatile int replicationLineUp = 0;
+
+ private volatile int pageLineUp = 0;
- private int minimalStore = Integer.MAX_VALUE;
-
- private int minimalReplicated = Integer.MAX_VALUE;
-
private int stored = 0;
private int replicated = 0;
+
+ private int paged = 0;
private int errorCode = -1;
@@ -93,6 +111,17 @@
super();
this.executor = executor;
}
+
+ public void pageSyncLineUp()
+ {
+ pageLineUp++;
+ }
+
+ public synchronized void pageSyncDone()
+ {
+ paged++;
+ checkTasks();
+ }
public void storeLineUp()
{
@@ -127,10 +156,11 @@
tasks = new LinkedList<TaskHolder>();
minimalReplicated = replicationLineUp;
minimalStore = storeLineUp;
+ minimalPage = pageLineUp;
}
// On this case, we can just execute the context directly
- if (replicationLineUp == replicated && storeLineUp == stored)
+ if (replicationLineUp == replicated && storeLineUp == stored && pageLineUp == paged)
{
// We want to avoid the executor if everything is complete...
// However, we can't execute the context if there are executions pending
@@ -168,13 +198,13 @@
private void checkTasks()
{
- if (stored >= minimalStore && replicated >= minimalReplicated)
+ if (stored >= minimalStore && replicated >= minimalReplicated && paged >= minimalPage)
{
Iterator<TaskHolder> iter = tasks.iterator();
while (iter.hasNext())
{
TaskHolder holder = iter.next();
- if (stored >= holder.storeLined && replicated >= holder.replicationLined)
+ if (stored >= holder.storeLined && replicated >= holder.replicationLined && paged >= holder.pageLined)
{
// If set, we use an executor to avoid the server being single threaded
execute(holder.task);
@@ -250,6 +280,8 @@
int storeLined;
int replicationLined;
+
+ int pageLined;
IOAsyncTask task;
@@ -257,6 +289,7 @@
{
storeLined = storeLineUp;
replicationLined = replicationLineUp;
+ pageLined = pageLineUp;
this.task = task;
}
}
@@ -288,4 +321,25 @@
}
}
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "OperationContextImpl [storeLineUp=" + storeLineUp +
+ ", stored=" +
+ stored +
+ ", replicationLineUp=" +
+ replicationLineUp +
+ ", replicated=" +
+ replicated +
+ ", pageLineUp=" +
+ pageLineUp +
+ ", paged=" +
+ paged +
+ "]";
+ }
+
+
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -507,4 +507,13 @@
return 0;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storePageCounterInc(long, int)
+ */
+ public long storePageCounterInc(long queueID, int add) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -219,6 +219,8 @@
journalLoadInformation = storage.loadInternalOnly();
pageManager = new PagingManagerImpl(new PagingStoreFactoryNIO(config.getPagingDirectory(),
+ config.getJournalBufferSize_NIO(),
+ server.getScheduledPool(),
server.getExecutorFactory(),
config.isJournalSyncNonTransactional()),
storage,
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -15,6 +15,7 @@
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
import javax.management.MBeanServer;
@@ -140,6 +141,8 @@
void destroyQueue(SimpleString queueName, ServerSession session) throws Exception;
+ ScheduledExecutorService getScheduledPool();
+
ExecutorFactory getExecutorFactory();
void setGroupingHandler(GroupingHandler groupingHandler);
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -764,6 +764,12 @@
// HornetQServer implementation
// -----------------------------------------------------------
+
+ public ScheduledExecutorService getScheduledPool()
+ {
+ return scheduledPool;
+ }
+
public Configuration getConfiguration()
{
return configuration;
@@ -1110,7 +1116,10 @@
protected PagingManager createPagingManager()
{
+
return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
+ (long)configuration.getJournalBufferSize_NIO(),
+ scheduledPool,
executorFactory,
configuration.isJournalSyncNonTransactional()),
storageManager,
Modified: trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -25,6 +25,8 @@
public class TransactionPropertyIndexes
{
+ public static final int PAGE_SYNC = 2;
+
public static final int PAGE_COUNT_INC = 3;
public static final int PAGE_TRANSACTION_UPDATE = 4;
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -13,47 +13,46 @@
package org.hornetq.tests.integration.client;
-import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
import junit.framework.Assert;
import junit.framework.AssertionFailedError;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
-import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.paging.PagingStoreFactory;
-import org.hornetq.core.paging.impl.PagingManagerImpl;
-import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
-import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.spi.core.security.HornetQSecurityManager;
-import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.ExecutorFactory;
/**
* A PagingTest
@@ -111,7 +110,189 @@
super.tearDown();
}
+
+ public void testPreparePersistent() throws Exception
+ {
+ boolean persistentMessages = true;
+
+ System.out.println("PageDir:" + getPageDir());
+ clearData();
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 10000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ session.close();
+ session = null;
+
+ sf.close();
+ locator.close();
+
+ server.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+
+ Queue queue = server.locateQueue(ADDRESS);
+
+ assertEquals(numberOfMessages, queue.getMessageCount());
+
+
+ LinkedList<Xid> xids = new LinkedList<Xid>();
+
+ int msgReceived = 0;
+ for (int i = 0 ; i < numberOfMessages / 999; i++)
+ {
+ ClientSession sessionConsumer = sf.createSession(true, false, false);
+ Xid xid = newXID();
+ xids.add(xid);
+ sessionConsumer.start(xid, XAResource.TMNOFLAGS);
+ sessionConsumer.start();
+ ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
+ for (int msgCount = 0 ; msgCount < 1000; i++)
+ {
+ if (msgReceived == numberOfMessages)
+ {
+ break;
+ }
+ System.out.println("MsgReceived = " + (msgReceived++));
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+ sessionConsumer.end(xid, XAResource.TMSUCCESS);
+ sessionConsumer.prepare(xid);
+ sessionConsumer.close();
+ }
+
+
+ ClientSession sessionCheck = sf.createSession(true, true);
+
+ ClientConsumer consumer = sessionCheck.createConsumer(PagingTest.ADDRESS);
+
+ assertNull(consumer.receiveImmediate());
+
+ sessionCheck.close();
+
+ sf.close();
+ locator.close();
+
+ server.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ queue = server.locateQueue(ADDRESS);
+
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, false, false);
+
+ consumer = session.createConsumer(PagingTest.ADDRESS);
+
+ session.start();
+
+ assertNull(consumer.receiveImmediate());
+
+ for (Xid xid : xids)
+ {
+ session.rollback(xid);
+ }
+
+ xids.clear();
+
+ assertNotNull(consumer.receiveImmediate());
+
+ session.close();
+
+ sf.close();
+
+ locator.close();
+
+ //assertEquals(numberOfMessages, queue.getMessageCount());
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+
public void testSendReceivePagingPersistent() throws Exception
{
internaltestSendReceivePaging(true);
@@ -137,6 +318,8 @@
clearData();
Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
HornetQServer server = createServer(true,
config,
@@ -284,7 +467,7 @@
Assert.assertNotNull(message2);
- session.commit();
+ if (i % 1000 == 0) session.commit();
try
{
@@ -299,6 +482,8 @@
throw e;
}
}
+
+ session.commit();
consumer.close();
@@ -362,6 +547,8 @@
clearData();
Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
HornetQServer server = createServer(true,
config,
@@ -452,7 +639,7 @@
Assert.assertNotNull(message2);
- session.commit();
+ if (i % 1000 == 0) session.commit();
try
{
@@ -1060,203 +1247,6 @@
}
}
- // This test will force a depage thread as soon as the first message hits the page
- public void testDepageDuringTransaction5() throws Exception
- {
- clearData();
-
- final Configuration config = createDefaultConfig();
- HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
-
- final Executor executor = Executors.newSingleThreadExecutor();
-
- final AtomicInteger countDepage = new AtomicInteger(0);
- class HackPagingStore extends PagingStoreImpl
- {
- HackPagingStore(final SimpleString address,
- final PagingManager pagingManager,
- final StorageManager storageManager,
- final PostOffice postOffice,
- final SequentialFileFactory fileFactory,
- final PagingStoreFactory storeFactory,
- final SimpleString storeName,
- final AddressSettings addressSettings,
- final ExecutorFactory executorFactory,
- final boolean syncNonTransactional)
- {
- super(address,
- pagingManager,
- storageManager,
- postOffice,
- fileFactory,
- storeFactory,
- storeName,
- addressSettings,
- executorFactory,
- syncNonTransactional);
- }
-
- public boolean startDepaging()
- {
- // do nothing, we are hacking depage right in between paging
- return false;
- }
-
- };
-
- class HackStoreFactory extends PagingStoreFactoryNIO
- {
- HackStoreFactory(final String directory,
- final ExecutorFactory executorFactory,
- final boolean syncNonTransactional)
- {
- super(directory, executorFactory, syncNonTransactional);
- }
-
- public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings)
- {
-
- return new HackPagingStore(address,
- getPagingManager(),
- getStorageManager(),
- getPostOffice(),
- null,
- this,
- address,
- settings,
- getExecutorFactory(),
- syncNonTransactional);
- }
-
- };
-
- HornetQServer server = new HornetQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), securityManager)
-
- {
- protected PagingManager createPagingManager()
- {
- return new PagingManagerImpl(new HackStoreFactory(config.getPagingDirectory(),
- getExecutorFactory(),
- config.isJournalSyncNonTransactional()),
- getStorageManager(),
- getAddressSettingsRepository());
- }
- };
-
- AddressSettings defaultSetting = new AddressSettings();
- defaultSetting.setPageSizeBytes(PAGE_SIZE);
- defaultSetting.setMaxSizeBytes(PAGE_MAX);
- server.getAddressSettingsRepository().addMatch("#", defaultSetting);
-
- server.start();
-
- final AtomicInteger errors = new AtomicInteger(0);
-
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setBlockOnAcknowledge(false);
-
- final ClientSessionFactory sf = locator.createSessionFactory();
- final int messageSize = 1024; // 1k
- final int numberOfMessages = 2000;
-
- try
- {
-
- final byte[] body = new byte[messageSize];
-
- Thread producerThread = new Thread()
- {
- public void run()
- {
- ClientSession sessionProducer = null;
- try
- {
- sessionProducer = sf.createSession(false, false);
- ClientProducer producer = sessionProducer.createProducer(ADDRESS);
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage msg = sessionProducer.createMessage(true);
- msg.getBodyBuffer().writeBytes(body);
- msg.putIntProperty("count", i);
- producer.send(msg);
-
- if (i % 500 == 0 && i != 0)
- {
- sessionProducer.commit();
- // Thread.sleep(500);
- }
- }
-
- sessionProducer.commit();
-
- System.out.println("Producer gone");
-
- }
- catch (Throwable e)
- {
- e.printStackTrace(); // >> junit report
- errors.incrementAndGet();
- }
- finally
- {
- try
- {
- if (sessionProducer != null)
- {
- sessionProducer.close();
- }
- }
- catch (Throwable e)
- {
- e.printStackTrace();
- errors.incrementAndGet();
- }
- }
- }
- };
-
- ClientSession session = sf.createSession(true, true, 0);
- session.start();
- session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
-
- producerThread.start();
-
- ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage msg = consumer.receive(500000);
- assertNotNull(msg);
- assertEquals(i, msg.getIntProperty("count").intValue());
- msg.acknowledge();
- if (i > 0 && i % 10 == 0)
- {
- // session.commit();
- }
- }
- // session.commit();
-
- session.close();
-
- producerThread.join();
-
- assertEquals(0, errors.get());
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
- }
-
public void testOrderingNonTX() throws Exception
{
clearData();
@@ -1399,6 +1389,8 @@
clearData();
Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
HornetQServer server = createServer(true,
config,
@@ -1980,7 +1972,7 @@
}
}
-
+
public void testDropMessagesExpiring() throws Exception
{
clearData();
@@ -2190,7 +2182,207 @@
}
}
+
+ public void testSyncPage() throws Exception
+ {
+ Configuration config = createDefaultConfig();
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ try
+ {
+ server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
+
+ final CountDownLatch pageUp = new CountDownLatch(0);
+ final CountDownLatch pageDone = new CountDownLatch(1);
+
+ OperationContext ctx = new OperationContext()
+ {
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ }
+
+ public void storeLineUp()
+ {
+ }
+
+ public boolean waitCompletion(long timeout) throws Exception
+ {
+ return false;
+ }
+
+ public void waitCompletion() throws Exception
+ {
+
+ }
+
+ public void replicationLineUp()
+ {
+
+ }
+
+ public void replicationDone()
+ {
+
+ }
+
+ public void pageSyncLineUp()
+ {
+ pageUp.countDown();
+ }
+
+ public void pageSyncDone()
+ {
+ pageDone.countDown();
+ }
+
+ public void executeOnCompletion(IOAsyncTask runnable)
+ {
+
+ }
+ };
+
+
+ OperationContextImpl.setContext(ctx);
+
+ PagingManager paging = server.getPagingManager();
+
+ PagingStore store = paging.getPageStore(ADDRESS);
+
+ store.sync();
+
+ assertTrue(pageUp.await(10, TimeUnit.SECONDS));
+
+ assertTrue(pageDone.await(10, TimeUnit.SECONDS));
+
+ server.stop();
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+
+ public void testSyncPageTX() throws Exception
+ {
+ Configuration config = createDefaultConfig();
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ try
+ {
+ server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
+
+ final CountDownLatch pageUp = new CountDownLatch(0);
+ final CountDownLatch pageDone = new CountDownLatch(1);
+
+ OperationContext ctx = new OperationContext()
+ {
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ }
+
+ public void storeLineUp()
+ {
+ }
+
+ public boolean waitCompletion(long timeout) throws Exception
+ {
+ return false;
+ }
+
+ public void waitCompletion() throws Exception
+ {
+
+ }
+
+ public void replicationLineUp()
+ {
+
+ }
+
+ public void replicationDone()
+ {
+
+ }
+
+ public void pageSyncLineUp()
+ {
+ pageUp.countDown();
+ }
+
+ public void pageSyncDone()
+ {
+ pageDone.countDown();
+ }
+
+ public void executeOnCompletion(IOAsyncTask runnable)
+ {
+
+ }
+ };
+
+
+ OperationContextImpl.setContext(ctx);
+
+ PagingManager paging = server.getPagingManager();
+
+ PagingStore store = paging.getPageStore(ADDRESS);
+
+ store.sync();
+
+ assertTrue(pageUp.await(10, TimeUnit.SECONDS));
+
+ assertTrue(pageDone.await(10, TimeUnit.SECONDS));
+
+ server.stop();
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+
public void testPagingOneDestinationOnly() throws Exception
{
SimpleString PAGED_ADDRESS = new SimpleString("paged");
Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -14,7 +14,6 @@
package org.hornetq.tests.integration.replication;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -34,7 +33,6 @@
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
@@ -59,7 +57,6 @@
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.replication.impl.ReplicatedJournal;
import org.hornetq.core.replication.impl.ReplicationManagerImpl;
import org.hornetq.core.server.HornetQServer;
@@ -69,7 +66,6 @@
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.spi.core.protocol.RemotingConnection;
-import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
@@ -710,6 +706,8 @@
{
PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
+ 1000,
+ null,
executorFactory,
false),
storageManager,
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -67,6 +67,7 @@
PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(getPageDir(),
+ 100, null,
new OrderedExecutorFactory(Executors.newCachedThreadPool()),
true);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -138,6 +138,8 @@
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ null,
+ 100,
createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
@@ -174,6 +176,8 @@
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ null,
+ 100,
createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
@@ -210,6 +214,8 @@
storeImpl.sync();
storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ null,
+ 100,
createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
@@ -237,6 +243,8 @@
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ null,
+ 100,
createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
@@ -312,6 +320,8 @@
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ null,
+ 100,
createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
@@ -358,7 +368,7 @@
for (int pageNr = 0; pageNr < 2; pageNr++)
{
Page page = storeImpl.depage();
-
+
System.out.println("numberOfPages = " + storeImpl.getNumberOfPages());
page.open();
@@ -459,6 +469,8 @@
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
final TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ null,
+ 100,
createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
@@ -622,6 +634,8 @@
}
TestSupportPageStore storeImpl2 = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ null,
+ 100,
createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
@@ -644,7 +658,7 @@
long lastMessageId = messageIdGenerator.incrementAndGet();
ServerMessage lastMsg = createMessage(lastMessageId, storeImpl, destination, createRandomBuffer(lastMessageId, 5));
-
+
storeImpl2.forceAnotherPage();
storeImpl2.page(lastMsg, new RoutingContextImpl(null));
@@ -707,6 +721,8 @@
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
final TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ null,
+ 100,
createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
@@ -747,6 +763,8 @@
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
final TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ null,
+ 100,
createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
@@ -755,7 +773,7 @@
new SimpleString("test"),
settings,
getExecutorFactory(),
- true);
+ false);
storeImpl.start();
@@ -1181,7 +1199,6 @@
return 0;
}
-
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#loadBindingJournal(java.util.List)
*/
@@ -1605,7 +1622,7 @@
public void deleteIncrementRecord(long txID, long recordID) throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -1614,7 +1631,7 @@
public void deletePageCounter(long txID, long recordID) throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -1626,6 +1643,15 @@
return 0;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storePageCounterInc(long, int)
+ */
+ public long storePageCounterInc(long queueID, int add) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
Modified: trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java 2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java 2010-12-22 19:30:59 UTC (rev 10067)
@@ -45,6 +45,67 @@
// Public --------------------------------------------------------
+ public void testCompleteTaskAfterPaging() throws Exception
+ {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try
+ {
+ OperationContextImpl impl = new OperationContextImpl(executor);
+ final CountDownLatch latch1 = new CountDownLatch(1);
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ impl.executeOnCompletion(new IOAsyncTask()
+ {
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ latch1.countDown();
+ }
+ });
+
+ assertTrue(latch1.await(10, TimeUnit.SECONDS));
+
+ for (int i = 0 ; i < 10; i++) impl.storeLineUp();
+ for (int i = 0 ; i < 3; i++) impl.pageSyncLineUp();
+
+ impl.executeOnCompletion(new IOAsyncTask()
+ {
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ latch2.countDown();
+ }
+ });
+
+
+ assertFalse(latch2.await(1, TimeUnit.MILLISECONDS));
+
+ for (int i = 0 ; i < 9; i++) impl.done();
+ for (int i = 0 ; i < 2; i++) impl.pageSyncDone();
+
+
+ assertFalse(latch2.await(1, TimeUnit.MILLISECONDS));
+
+ impl.done();
+ impl.pageSyncDone();
+
+ assertTrue(latch2.await(10, TimeUnit.SECONDS));
+
+ }
+ finally
+ {
+ executor.shutdown();
+ }
+ }
+
public void testCaptureExceptionOnExecutor() throws Exception
{
ExecutorService executor = Executors.newSingleThreadExecutor();
15 years, 5 months
JBoss hornetq SVN: r10066 - trunk/examples/jms/multiple-failover-failback/src/org/hornetq/jms/example.
by do-not-reply@jboss.org
Author: pslavice(a)redhat.com
Date: 2010-12-22 03:37:28 -0500 (Wed, 22 Dec 2010)
New Revision: 10066
Modified:
trunk/examples/jms/multiple-failover-failback/src/org/hornetq/jms/example/MultipleFailoverFailbackExample.java
Log:
Removed method getServer, this method has been moved into common/src/org/hornetq/common/example/HornetQExample.java in r10022, HORNETQ-595
Modified: trunk/examples/jms/multiple-failover-failback/src/org/hornetq/jms/example/MultipleFailoverFailbackExample.java
===================================================================
--- trunk/examples/jms/multiple-failover-failback/src/org/hornetq/jms/example/MultipleFailoverFailbackExample.java 2010-12-21 20:01:58 UTC (rev 10065)
+++ trunk/examples/jms/multiple-failover-failback/src/org/hornetq/jms/example/MultipleFailoverFailbackExample.java 2010-12-22 08:37:28 UTC (rev 10066)
@@ -151,12 +151,4 @@
}
}
}
-
- private int getServer(Connection connection)
- {
- DelegatingSession session = (DelegatingSession) ((HornetQConnection) connection).getInitialSession();
- TransportConfiguration transportConfiguration = session.getSessionFactory().getConnectorConfiguration();
- String port = (String) transportConfiguration.getParams().get("port");
- return Integer.valueOf(port) - 5445;
- }
}
15 years, 5 months
JBoss hornetq SVN: r10065 - in trunk: src/main/org/hornetq/core/protocol/core/impl and 8 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-12-21 15:01:58 -0500 (Tue, 21 Dec 2010)
New Revision: 10065
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
trunk/src/main/org/hornetq/core/remoting/server/RemotingService.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
Log:
session factory now deals with trying both live and backup so as to avoid starvation, also removed test for failoveronservershutdown as makes no sense when only 1 server can ever be live
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-12-21 20:01:58 UTC (rev 10065)
@@ -92,8 +92,6 @@
private ConnectorFactory connectorFactory;
- private Map<String, Object> transportParams;
-
private final long callTimeout;
private final long clientFailureCheckPeriod;
@@ -176,10 +174,8 @@
connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
- transportParams = connectorConfig.getParams();
+ checkTransportKeys(connectorFactory, connectorConfig.getParams());
- checkTransportKeys(connectorFactory, transportParams);
-
this.callTimeout = callTimeout;
this.clientFailureCheckPeriod = clientFailureCheckPeriod;
@@ -209,31 +205,17 @@
public void connect(int initialConnectAttempts, boolean failoverOnInitialConnection) throws HornetQException
{
// Get the connection
-
getConnectionWithRetry(initialConnectAttempts);
- if (connection == null && failoverOnInitialConnection)
+ if (connection == null)
{
- if (backupConfig != null)
+ StringBuffer msg = new StringBuffer("Unable to connect to server using configuration ").append(connectorConfig);
+ if(backupConfig != null)
{
- // Try and connect to the backup
-
- log.warn("Server is not available to make initial connection to. Will try backup server instead.");
-
- this.connectorConfig = backupConfig;
-
- connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
-
- transportParams = this.connectorConfig.getParams();
-
- getConnectionWithRetry(initialConnectAttempts);
+ msg.append(" and backup configuration ").append(backupConfig);
}
- }
-
- if (connection == null)
- {
throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Unable to connect to server using configuration " + connectorConfig);
+ msg.toString());
}
}
@@ -251,6 +233,11 @@
}
}
+ public Object getBackupConnector()
+ {
+ return backupConfig;
+ }
+
public ClientSession createSession(final String username,
final String password,
final boolean xa,
@@ -524,25 +511,9 @@
// It can then release the channel 1 lock, and retry (which will cause locking on failoverLock
// until failover is complete
- boolean serverShutdown = me.getCode() == HornetQException.DISCONNECTED;
- // We will try to failover if there is a backup connector factory, but we don't do this if the server
- // has been shutdown cleanly unless failoverOnServerShutdown is true
- boolean attemptFailover = (backupConfig != null) && !serverShutdown;
-
- boolean attemptReconnect;
-
- if (attemptFailover)
+ if (reconnectAttempts != 0)
{
- attemptReconnect = false;
- }
- else
- {
- attemptReconnect = reconnectAttempts != 0;
- }
-
- if (attemptFailover || attemptReconnect)
- {
lockChannel1();
final boolean needToInterrupt;
@@ -599,25 +570,8 @@
connector = null;
- if (attemptFailover)
- {
- // Now try failing over to backup
+ reconnectSessions(oldConnection, reconnectAttempts);
- this.connectorConfig = backupConfig;
-
- backupConfig = null;
-
- connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
-
- transportParams = connectorConfig.getParams();
-
- reconnectSessions(oldConnection, reconnectAttempts == -1 ? -1 : reconnectAttempts + 1);
- }
- else
- {
- reconnectSessions(oldConnection, reconnectAttempts);
- }
-
oldConnection.destroy();
}
else
@@ -1011,7 +965,7 @@
{
DelegatingBufferHandler handler = new DelegatingBufferHandler();
- connector = connectorFactory.createConnector(transportParams,
+ connector = connectorFactory.createConnector(connectorConfig.getParams(),
handler,
this,
closeExecutor,
@@ -1037,6 +991,45 @@
connector = null;
}
}
+ //if connection fails we can try the backup incase it has come live
+ if(connector == null && backupConfig != null)
+ {
+ ConnectorFactory backupConnectorFactory = instantiateConnectorFactory(backupConfig.getFactoryClassName());
+ connector = backupConnectorFactory.createConnector(backupConfig.getParams(),
+ handler,
+ this,
+ closeExecutor,
+ threadPool,
+ scheduledThreadPool);
+ if (connector != null)
+ {
+ connector.start();
+
+ tc = connector.createConnection();
+
+ if (tc == null)
+ {
+ try
+ {
+ connector.close();
+ }
+ catch (Throwable t)
+ {
+ }
+
+ connector = null;
+ }
+ else
+ {
+ /*looks like the backup is now live, lets use that*/
+ connectorConfig = backupConfig;
+
+ backupConfig = null;
+
+ connectorFactory = backupConnectorFactory;
+ }
+ }
+ }
}
catch (Exception e)
{
@@ -1224,8 +1217,7 @@
serverLocator.notifyNodeDown(msg.getNodeID().toString());
}
- conn.fail(new HornetQException(msg.isFailoverOnServerShutdown() ? HornetQException.NOT_CONNECTED
- : HornetQException.DISCONNECTED,
+ conn.fail(new HornetQException(HornetQException.DISCONNECTED,
"The connection was disconnected because of server shutdown"));
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2010-12-21 20:01:58 UTC (rev 10065)
@@ -44,4 +44,6 @@
TransportConfiguration getConnectorConfiguration();
void setBackupConnector(TransportConfiguration live, TransportConfiguration backUp);
+
+ Object getBackupConnector();
}
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-12-21 20:01:58 UTC (rev 10065)
@@ -324,7 +324,7 @@
callClosingListeners();
}
- public void disconnect(boolean failoverOnServerShutdown)
+ public void disconnect()
{
Channel channel0 = getChannel(0, -1);
@@ -343,7 +343,7 @@
channel.flushConfirmations();
}
- Packet disconnect = new DisconnectMessage(nodeID, failoverOnServerShutdown);
+ Packet disconnect = new DisconnectMessage(nodeID);
channel0.sendAndFlush(disconnect);
}
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java 2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java 2010-12-21 20:01:58 UTC (rev 10065)
@@ -31,20 +31,16 @@
// Attributes ----------------------------------------------------
private SimpleString nodeID;
-
- private boolean failoverOnServerShutdown;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public DisconnectMessage(final SimpleString nodeID, boolean failoverOnServerShutdown)
+ public DisconnectMessage(final SimpleString nodeID)
{
super(PacketImpl.DISCONNECT);
this.nodeID = nodeID;
-
- this.failoverOnServerShutdown = failoverOnServerShutdown;
}
public DisconnectMessage()
@@ -59,24 +55,16 @@
return nodeID;
}
- public boolean isFailoverOnServerShutdown()
- {
- return failoverOnServerShutdown;
- }
-
@Override
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeNullableSimpleString(nodeID);
-
- buffer.writeBoolean(failoverOnServerShutdown);
}
@Override
public void decodeRest(final HornetQBuffer buffer)
{
nodeID = buffer.readNullableSimpleString();
- failoverOnServerShutdown = buffer.readBoolean();
}
@Override
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-12-21 20:01:58 UTC (rev 10065)
@@ -194,7 +194,7 @@
manager.cleanup(this);
}
- public void disconnect(boolean clientFailover)
+ public void disconnect()
{
}
Modified: trunk/src/main/org/hornetq/core/remoting/server/RemotingService.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/RemotingService.java 2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/core/remoting/server/RemotingService.java 2010-12-21 20:01:58 UTC (rev 10065)
@@ -45,6 +45,4 @@
void freeze();
RemotingConnection getServerSideReplicatingConnection();
-
- void stop(boolean failoverOnServerShutdown) throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-12-21 20:01:58 UTC (rev 10065)
@@ -254,11 +254,6 @@
public void stop() throws Exception
{
- stop(false);
- }
-
- public void stop(boolean failoverOnServerShutdown) throws Exception
- {
if (!started)
{
return;
@@ -283,7 +278,7 @@
{
RemotingConnection conn = entry.connection;
- conn.disconnect(failoverOnServerShutdown);
+ conn.disconnect();
}
for (Acceptor acceptor : acceptors)
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-12-21 20:01:58 UTC (rev 10065)
@@ -217,6 +217,7 @@
if(backupServerLocator != null)
{
backupServerLocator.close();
+ backupServerLocator = null;
}
for (ServerLocatorInternal clusterLocator : clusterLocators)
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-12-21 20:01:58 UTC (rev 10065)
@@ -626,7 +626,7 @@
{
System.out.println("HornetQServerImpl.stop");
}
- remotingService.stop(failoverOnServerShutdown);
+ remotingService.stop();
synchronized (this)
{
Modified: trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-12-21 20:01:58 UTC (rev 10065)
@@ -151,9 +151,8 @@
/**
* Disconnect the connection, closing all channels
- * @param clientFailover
*/
- void disconnect(boolean clientFailover);
+ void disconnect();
/**
* returns true if any data has been received since the last time this method was called.
Added: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2010-12-21 20:01:58 UTC (rev 10065)
@@ -0,0 +1,323 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import junit.framework.Assert;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.server.impl.InVMNodeManager;
+import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.utils.ReusableLatch;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
+ * Date: Dec 21, 2010
+ * Time: 12:04:16 PM
+ */
+public class FailBackAutoTest extends FailoverTestBase
+{
+ private ServerLocatorInternal locator;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ locator = getServerLocator();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (locator != null)
+ {
+ try
+ {
+ locator.close();
+ }
+ catch (Exception e)
+ {
+ //
+ }
+ }
+ super.tearDown();
+ }
+
+ public void testAutoFailback() throws Exception
+ {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnInitialConnection(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ClientSession session = sendAndConsume(sf, true);
+
+ MyListener listener = new MyListener(latch);
+
+ session.addFailureListener(listener);
+
+ backupServer.stop();
+
+ liveServer.crash();
+
+ backupServer.start();
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ ClientMessage message = session.createMessage(true);
+
+ setBody(0, message);
+
+ producer.send(message);
+
+ session.removeFailureListener(listener);
+
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ listener = new MyListener(latch2);
+
+ session.addFailureListener(listener);
+
+ liveServer.start();
+
+ assertTrue(latch2.await(5, TimeUnit.SECONDS));
+
+ message = session.createMessage(true);
+
+ setBody(1, message);
+
+ producer.send(message);
+
+ session.close();
+
+ sf.close();
+
+ Assert.assertEquals(0, sf.numSessions());
+
+ Assert.assertEquals(0, sf.numConnections());
+ }
+
+ public void testAutoFailbackThenFailover() throws Exception
+ {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnInitialConnection(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ CountDownLatch latch = new CountDownLatch(1);
+
+ ClientSession session = sendAndConsume(sf, true);
+
+ MyListener listener = new MyListener(latch);
+
+ session.addFailureListener(listener);
+
+ backupServer.stop();
+
+ liveServer.crash();
+
+ backupServer.start();
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ ClientMessage message = session.createMessage(true);
+
+ setBody(0, message);
+
+ producer.send(message);
+
+ session.removeFailureListener(listener);
+
+ CountDownLatch latch2 = new CountDownLatch(1);
+
+ listener = new MyListener(latch2);
+
+ session.addFailureListener(listener);
+
+ liveServer.start();
+
+ assertTrue(latch2.await(5, TimeUnit.SECONDS));
+
+ message = session.createMessage(true);
+
+ setBody(1, message);
+
+ producer.send(message);
+
+ CountDownLatch latch3 = new CountDownLatch(1);
+
+ session.removeFailureListener(listener);
+
+ listener = new MyListener(latch3);
+
+ session.addFailureListener(listener);
+
+ waitForBackup(sf, 5);
+
+ liveServer.crash();
+
+ assertTrue(latch3.await(5, TimeUnit.SECONDS));
+
+ session.close();
+
+ sf.close();
+
+ Assert.assertEquals(0, sf.numSessions());
+
+ Assert.assertEquals(0, sf.numConnections());
+ }
+
+ protected void createConfigs() throws Exception
+ {
+ nodeManager = new InVMNodeManager();
+
+ backupConfig = super.createDefaultConfig();
+ backupConfig.getAcceptorConfigurations().clear();
+ backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
+ backupConfig.setSecurityEnabled(false);
+ backupConfig.setSharedStore(true);
+ backupConfig.setBackup(true);
+ backupConfig.setClustered(true);
+ TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
+ TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
+ backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+ backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(liveConnector.getName());
+ ClusterConnectionConfiguration cccLive = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1,
+ staticConnectors, false);
+ backupConfig.getClusterConfigurations().add(cccLive);
+ backupServer = createBackupServer();
+
+ liveConfig = super.createDefaultConfig();
+ liveConfig.getAcceptorConfigurations().clear();
+ liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
+ liveConfig.setSecurityEnabled(false);
+ liveConfig.setSharedStore(true);
+ liveConfig.setClustered(true);
+ List<String> pairs = new ArrayList<String>();
+ pairs.add(backupConnector.getName());
+ ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
+ pairs, false);
+ liveConfig.getClusterConfigurations().add(ccc0);
+ liveConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+ liveConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
+ liveServer = createLiveServer();
+ }
+
+ @Override
+ protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
+ {
+ return getInVMTransportAcceptorConfiguration(live);
+ }
+
+ @Override
+ protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
+ {
+ return getInVMConnectorTransportConfiguration(live);
+ }
+
+
+ private ClientSession sendAndConsume(final ClientSessionFactory sf, final boolean createQueue) throws Exception
+ {
+ ClientSession session = sf.createSession(false, true, true);
+
+ if (createQueue)
+ {
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, false);
+ }
+
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte) 1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBodyBuffer().writeString("aardvarks");
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive();
+
+ Assert.assertEquals("aardvarks", message2.getBodyBuffer().readString());
+
+ Assert.assertEquals(i, message2.getObjectProperty(new SimpleString("count")));
+
+ message2.acknowledge();
+ }
+
+ ClientMessage message3 = consumer.receiveImmediate();
+
+ Assert.assertNull(message3);
+
+ return session;
+ }
+
+ /**
+ * @param i
+ * @param message
+ * @throws Exception
+ */
+ protected void setBody(final int i, final ClientMessage message) throws Exception
+ {
+ message.getBodyBuffer().writeString("message" + i);
+ }
+
+ class MyListener implements SessionFailureListener
+ {
+ private final CountDownLatch latch;
+
+ public MyListener(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public void connectionFailed(final HornetQException me, boolean failedOver)
+ {
+ latch.countDown();
+ }
+
+ public void beforeReconnect(HornetQException exception)
+ {
+ System.out.println("MyListener.beforeReconnect");
+ }
+ }
+}
Added: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java 2010-12-21 20:01:58 UTC (rev 10065)
@@ -0,0 +1,287 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import junit.framework.Assert;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.server.impl.InVMNodeManager;
+import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.integration.cluster.util.TestableServer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
+ * Date: Dec 21, 2010
+ * Time: 12:04:16 PM
+ */
+public class FailBackManualTest extends FailoverTestBase
+{
+ private ServerLocatorInternal locator;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ locator = getServerLocator();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (locator != null)
+ {
+ try
+ {
+ locator.close();
+ }
+ catch (Exception e)
+ {
+ //
+ }
+ }
+ super.tearDown();
+ }
+
+
+ public void testNoAutoFailback() throws Exception
+ {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnInitialConnection(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ClientSession session = sendAndConsume(sf, true);
+
+ MyListener listener = new MyListener(latch);
+
+ session.addFailureListener(listener);
+
+ backupServer.stop();
+
+ liveServer.crash();
+
+ backupServer.start();
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ ClientMessage message = session.createMessage(true);
+
+ setBody(0, message);
+
+ producer.send(message);
+
+ session.removeFailureListener(listener);
+
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ listener = new MyListener(latch2);
+
+ session.addFailureListener(listener);
+
+ liveConfig.setAllowAutoFailBack(false);
+
+ Thread t = new Thread(new ServerStarter(liveServer));
+
+ t.start();
+
+ waitForBackup(sf, 5);
+
+ assertTrue(backupServer.isStarted());
+
+ backupServer.stop();
+
+ assertTrue(latch2.await(5, TimeUnit.SECONDS));
+
+ message = session.createMessage(true);
+
+ setBody(1, message);
+
+ producer.send(message);
+
+ session.close();
+
+ sf.close();
+
+ Assert.assertEquals(0, sf.numSessions());
+
+ Assert.assertEquals(0, sf.numConnections());
+ }
+
+
+ protected void createConfigs() throws Exception
+ {
+ nodeManager = new InVMNodeManager();
+
+ backupConfig = super.createDefaultConfig();
+ backupConfig.getAcceptorConfigurations().clear();
+ backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
+ backupConfig.setSecurityEnabled(false);
+ backupConfig.setSharedStore(true);
+ backupConfig.setBackup(true);
+ backupConfig.setClustered(true);
+ TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
+ TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
+ backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+ backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(liveConnector.getName());
+ ClusterConnectionConfiguration cccLive = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1,
+ staticConnectors, false);
+ backupConfig.getClusterConfigurations().add(cccLive);
+ backupConfig.setAllowAutoFailBack(false);
+ backupServer = createBackupServer();
+
+ liveConfig = super.createDefaultConfig();
+ liveConfig.getAcceptorConfigurations().clear();
+ liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
+ liveConfig.setSecurityEnabled(false);
+ liveConfig.setSharedStore(true);
+ liveConfig.setClustered(true);
+ List<String> pairs = new ArrayList<String>();
+ pairs.add(backupConnector.getName());
+ ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
+ pairs, false);
+ liveConfig.getClusterConfigurations().add(ccc0);
+ liveConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+ liveConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
+ liveConfig.setAllowAutoFailBack(false);
+ liveServer = createLiveServer();
+ }
+
+ @Override
+ protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
+ {
+ return getInVMTransportAcceptorConfiguration(live);
+ }
+
+ @Override
+ protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
+ {
+ return getInVMConnectorTransportConfiguration(live);
+ }
+
+
+ private ClientSession sendAndConsume(final ClientSessionFactory sf, final boolean createQueue) throws Exception
+ {
+ ClientSession session = sf.createSession(false, true, true);
+
+ if (createQueue)
+ {
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, false);
+ }
+
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte) 1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBodyBuffer().writeString("aardvarks");
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive();
+
+ Assert.assertEquals("aardvarks", message2.getBodyBuffer().readString());
+
+ Assert.assertEquals(i, message2.getObjectProperty(new SimpleString("count")));
+
+ message2.acknowledge();
+ }
+
+ ClientMessage message3 = consumer.receiveImmediate();
+
+ Assert.assertNull(message3);
+
+ return session;
+ }
+
+ /**
+ * @param i
+ * @param message
+ * @throws Exception
+ */
+ protected void setBody(final int i, final ClientMessage message) throws Exception
+ {
+ message.getBodyBuffer().writeString("message" + i);
+ }
+
+ class MyListener implements SessionFailureListener
+ {
+ private final CountDownLatch latch;
+
+ public MyListener(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public void connectionFailed(final HornetQException me, boolean failedOver)
+ {
+ latch.countDown();
+ }
+
+ public void beforeReconnect(HornetQException exception)
+ {
+ System.out.println("MyListener.beforeReconnect");
+ }
+ }
+
+ class ServerStarter implements Runnable
+ {
+ private final TestableServer server;
+
+ public ServerStarter(TestableServer server)
+ {
+ this.server = server;
+ }
+
+ public void run()
+ {
+ try
+ {
+ server.start();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-12-21 20:01:58 UTC (rev 10065)
@@ -19,6 +19,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
@@ -2114,6 +2115,245 @@
Assert.assertEquals(0, sf.numConnections());
}
+ public void testBackupServerNotRemoved() throws Exception
+ {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnInitialConnection(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener implements SessionFailureListener
+ {
+ public void connectionFailed(final HornetQException me, boolean failedOver)
+ {
+ latch.countDown();
+ }
+
+ public void beforeReconnect(HornetQException exception)
+ {
+ System.out.println("MyListener.beforeReconnect");
+ }
+ }
+
+ ClientSession session = sendAndConsume(sf, true);
+
+ session.addFailureListener(new MyListener());
+
+ backupServer.stop();
+
+ liveServer.crash();
+
+ backupServer.start();
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ ClientMessage message = session.createMessage(true);
+
+ setBody(0, message);
+
+ producer.send(message);
+
+ session.close();
+
+ sf.close();
+
+ Assert.assertEquals(0, sf.numSessions());
+
+ Assert.assertEquals(0, sf.numConnections());
+ }
+
+ public void testLiveAndBackupLiveComesBack() throws Exception
+ {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnInitialConnection(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener implements SessionFailureListener
+ {
+ public void connectionFailed(final HornetQException me, boolean failedOver)
+ {
+ latch.countDown();
+ }
+
+ public void beforeReconnect(HornetQException exception)
+ {
+ System.out.println("MyListener.beforeReconnect");
+ }
+ }
+
+ ClientSession session = sendAndConsume(sf, true);
+
+ session.addFailureListener(new MyListener());
+
+ backupServer.stop();
+
+ liveServer.crash();
+
+ liveServer.start();
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ ClientMessage message = session.createMessage(true);
+
+ setBody(0, message);
+
+ producer.send(message);
+
+ session.close();
+
+ sf.close();
+
+ Assert.assertEquals(0, sf.numSessions());
+
+ Assert.assertEquals(0, sf.numConnections());
+ }
+
+ public void testLiveAndBackupLiveComesBackNewFactory() throws Exception
+ {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnInitialConnection(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener implements SessionFailureListener
+ {
+ public void connectionFailed(final HornetQException me, boolean failedOver)
+ {
+ latch.countDown();
+ }
+
+ public void beforeReconnect(HornetQException exception)
+ {
+ System.out.println("MyListener.beforeReconnect");
+ }
+ }
+
+ ClientSession session = sendAndConsume(sf, true);
+
+ session.addFailureListener(new MyListener());
+
+ backupServer.stop();
+
+ liveServer.crash();
+
+ liveServer.start();
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ ClientMessage message = session.createMessage(true);
+
+ setBody(0, message);
+
+ producer.send(message);
+
+ session.close();
+
+ sf.close();
+
+ sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+
+ session = sf.createSession();
+
+ ClientConsumer cc = session.createConsumer(FailoverTestBase.ADDRESS);
+
+ session.start();
+
+ ClientMessage cm = cc.receive(5000);
+
+ assertNotNull(cm);
+
+ Assert.assertEquals("message0", cm.getBodyBuffer().readString());
+
+ session.close();
+
+ sf.close();
+
+ Assert.assertEquals(0, sf.numSessions());
+
+ Assert.assertEquals(0, sf.numConnections());
+ }
+
+ public void testLiveAndBackupBackupComesBackNewFactory() throws Exception
+ {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setFailoverOnInitialConnection(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener implements SessionFailureListener
+ {
+ public void connectionFailed(final HornetQException me, boolean failedOver)
+ {
+ latch.countDown();
+ }
+
+ public void beforeReconnect(HornetQException exception)
+ {
+ System.out.println("MyListener.beforeReconnect");
+ }
+ }
+
+ ClientSession session = sendAndConsume(sf, true);
+
+ session.addFailureListener(new MyListener());
+
+ backupServer.stop();
+
+ liveServer.crash();
+
+ backupServer.start();
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ ClientMessage message = session.createMessage(true);
+
+ setBody(0, message);
+
+ producer.send(message);
+
+ session.close();
+
+ sf.close();
+
+ sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+
+ session = sf.createSession();
+
+ ClientConsumer cc = session.createConsumer(FailoverTestBase.ADDRESS);
+
+ session.start();
+
+ ClientMessage cm = cc.receive(5000);
+
+ assertNotNull(cm);
+
+ Assert.assertEquals("message0", cm.getBodyBuffer().readString());
+
+ session.close();
+
+ sf.close();
+
+ Assert.assertEquals(0, sf.numSessions());
+
+ Assert.assertEquals(0, sf.numConnections());
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-12-21 20:01:58 UTC (rev 10065)
@@ -50,8 +50,6 @@
* A FailoverTestBase
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- *
*/
public abstract class FailoverTestBase extends ServiceTestBase
{
@@ -68,7 +66,7 @@
protected Configuration backupConfig;
protected Configuration liveConfig;
-
+
protected NodeManager nodeManager;
// Static --------------------------------------------------------
@@ -139,42 +137,19 @@
ArrayList<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(liveConnector.getName());
ClusterConnectionConfiguration cccLive = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1,
- staticConnectors, false);
+ staticConnectors, false);
backupConfig.getClusterConfigurations().add(cccLive);
backupServer = createBackupServer();
-
- // FIXME
- /*
- server1Service.registerActivateCallback(new ActivateCallback()
- {
-
- public void preActivate()
- {
- // To avoid two servers messing up with the same journal at any single point
- }
-
- public void activated()
- {
- try
- {
- liveServer.getStorageManager().stop();
- }
- catch (Exception ignored)
- {
- }
- }
- });
-*/
liveConfig = super.createDefaultConfig();
liveConfig.getAcceptorConfigurations().clear();
liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
liveConfig.setSecurityEnabled(false);
liveConfig.setSharedStore(true);
liveConfig.setClustered(true);
- List<String> pairs = null;
+ List<String> pairs = null;
ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
- pairs, false);
+ pairs, false);
liveConfig.getClusterConfigurations().add(ccc0);
liveConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
liveServer = createLiveServer();
@@ -193,7 +168,7 @@
config1.setSharedStore(false);
config1.setBackup(true);
backupServer = createBackupServer();
-
+
Configuration config0 = super.createDefaultConfig();
config0.getAcceptorConfigurations().clear();
config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
@@ -249,25 +224,51 @@
}
protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator, int topologyMembers)
- throws Exception
- {
- ClientSessionFactoryInternal sf;
- CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
+ throws Exception
+ {
+ ClientSessionFactoryInternal sf;
+ CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
- locator.addClusterTopologyListener(new LatchClusterTopologyListener(countDownLatch));
+ locator.addClusterTopologyListener(new LatchClusterTopologyListener(countDownLatch));
- sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+ sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
- boolean ok = countDownLatch.await(5, TimeUnit.SECONDS);
- assertTrue(ok);
- return sf;
- }
+ boolean ok = countDownLatch.await(5, TimeUnit.SECONDS);
+ assertTrue(ok);
+ return sf;
+ }
+ protected void waitForBackup(ClientSessionFactoryInternal sf, long seconds)
+ throws Exception
+ {
+ long time = System.currentTimeMillis();
+ long toWait = seconds * 1000;
+ while (sf.getBackupConnector() == null)
+ {
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+ if (sf.getBackupConnector() != null)
+ {
+ break;
+ }
+ else if (System.currentTimeMillis() > (time + toWait))
+ {
+ fail("backup server never started");
+ }
+ }
+ }
+
protected void waitForBackup(long seconds)
{
long time = System.currentTimeMillis();
long toWait = seconds * 1000;
- while(!backupServer.isInitialised())
+ while (!backupServer.isInitialised())
{
try
{
@@ -277,20 +278,43 @@
{
//ignore
}
- if(backupServer.isInitialised())
+ if (backupServer.isInitialised())
{
break;
}
- else if(System.currentTimeMillis() > (time + toWait))
+ else if (System.currentTimeMillis() > (time + toWait))
{
fail("backup server never started");
}
}
- System.out.println("FailoverTestBase.waitForNewLive");
}
-
-
+ protected void waitForBackup(long seconds, TestableServer server)
+ {
+ long time = System.currentTimeMillis();
+ long toWait = seconds * 1000;
+ while (!server.isInitialised())
+ {
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+ if (server.isInitialised())
+ {
+ break;
+ }
+ else if (System.currentTimeMillis() > (time + toWait))
+ {
+ fail("server never started");
+ }
+ }
+ }
+
+
protected TransportConfiguration getInVMConnectorTransportConfiguration(final boolean live)
{
if (live)
@@ -334,10 +358,10 @@
Map<String, Object> server1Params = new HashMap<String, Object>();
server1Params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
- org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+ org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
return new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory",
- server1Params);
+ server1Params);
}
}
@@ -352,10 +376,10 @@
Map<String, Object> server1Params = new HashMap<String, Object>();
server1Params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
- org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+ org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
return new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory",
- server1Params);
+ server1Params);
}
}
@@ -373,7 +397,7 @@
{
liveServer.crash(sessions);
}
-
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
@@ -384,7 +408,7 @@
{
}
}
-
+
class LatchClusterTopologyListener implements ClusterTopologyListener
{
final CountDownLatch latch;
@@ -400,12 +424,12 @@
public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
{
- if(connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
+ if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
{
liveNode.add(connectorPair.a.getName());
latch.countDown();
}
- if(connectorPair.b != null && !backupNode.contains(connectorPair.b.getName()))
+ if (connectorPair.b != null && !backupNode.contains(connectorPair.b.getName()))
{
backupNode.add(connectorPair.b.getName());
latch.countDown();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2010-12-21 08:05:47 UTC (rev 10064)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2010-12-21 20:01:58 UTC (rev 10065)
@@ -70,6 +70,18 @@
super();
}
+ @Override
+ public void testLiveAndBackupLiveComesBackNewFactory() throws Exception
+ {
+ Thread.sleep(1000);
+ }
+
+ @Override
+ public void testLiveAndBackupBackupComesBackNewFactory() throws Exception
+ {
+ Thread.sleep(1000);
+ }
+
/**
* @param i
* @param message
15 years, 5 months
JBoss hornetq SVN: r10064 - in trunk: src/main/org/hornetq/core/client/impl and 8 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-12-21 03:05:47 -0500 (Tue, 21 Dec 2010)
New Revision: 10064
Modified:
trunk/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
trunk/src/main/org/hornetq/core/client/impl/Topology.java
trunk/src/main/org/hornetq/core/client/impl/TopologyMember.java
trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
trunk/src/main/org/hornetq/core/server/cluster/ClusterManager.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
Log:
removed distance parameter from nodeup as not needed
Modified: trunk/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -26,7 +26,7 @@
*/
public interface ClusterTopologyListener
{
- void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
+ void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
void nodeDown(String nodeID);
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -1243,8 +1243,7 @@
{
serverLocator.notifyNodeUp(topMessage.getNodeID(),
topMessage.getPair(),
- topMessage.isLast(),
- topMessage.getDistance());
+ topMessage.isLast());
}
}
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -1139,15 +1139,14 @@
public synchronized void notifyNodeUp(final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final boolean last,
- final int distance)
+ final boolean last)
{
if (!ha)
{
return;
}
- topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
+ topology.addMember(nodeID, new TopologyMember(connectorPair));
TopologyMember actMember = topology.getMember(nodeID);
@@ -1172,7 +1171,7 @@
for (ClusterTopologyListener listener : topologyListeners)
{
- listener.nodeUP(nodeID, connectorPair, last, distance);
+ listener.nodeUP(nodeID, connectorPair, last);
}
// Notify if waiting on getting topology
@@ -1242,6 +1241,10 @@
public void addClusterTopologyListener(final ClusterTopologyListener listener)
{
topologyListeners.add(listener);
+ if(topology.members() > 0)
+ {
+ System.out.println("ServerLocatorImpl.addClusterTopologyListener");
+ }
}
public void removeClusterTopologyListener(final ClusterTopologyListener listener)
Modified: trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -40,7 +40,7 @@
ClientSessionFactory connect() throws Exception;
- void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
+ void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
void notifyNodeDown(String nodeID);
Modified: trunk/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/Topology.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/client/impl/Topology.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -47,7 +47,10 @@
TopologyMember currentMember = topology.get(nodeId);
if (debug)
{
- //System.out.println("member.getConnector() = " + member.getConnector());
+ if(member.getConnector().toString().contains("server-id=4"))
+ {
+ System.out.println("member.getConnector() = " + member.getConnector());
+ }
}
if(currentMember == null)
{
@@ -90,7 +93,7 @@
int count = 0;
for (Map.Entry<String, TopologyMember> entry : topology.entrySet())
{
- listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == topology.size(), entry.getValue().getDistance());
+ listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == topology.size());
}
}
Modified: trunk/src/main/org/hornetq/core/client/impl/TopologyMember.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/TopologyMember.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/client/impl/TopologyMember.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -27,27 +27,19 @@
private final Pair<TransportConfiguration, TransportConfiguration> connector;
- private final int distance;
-
- public TopologyMember(Pair<TransportConfiguration, TransportConfiguration> connector, int distance)
+ public TopologyMember(Pair<TransportConfiguration, TransportConfiguration> connector)
{
this.connector = connector;
- this.distance = distance;
}
public Pair<TransportConfiguration, TransportConfiguration> getConnector()
{
return connector;
}
-
- public int getDistance()
- {
- return distance;
- }
@Override
public String toString()
{
- return "TopologyMember[distance=" + distance + ", connector=" + connector + "]";
+ return "TopologyMember[connector=" + connector + "]";
}
}
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -111,9 +111,9 @@
final ClusterTopologyListener listener = new ClusterTopologyListener()
{
- public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
+ public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last, distance + 1));
+ channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
}
public void nodeDown(String nodeID)
@@ -147,7 +147,7 @@
{
pair = new Pair<TransportConfiguration, TransportConfiguration>(msg.getConnector(), null);
}
- server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false, 1);
+ server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false);
}
}
});
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -39,13 +39,11 @@
private boolean last;
- private int distance;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public ClusterTopologyChangeMessage(final String nodeID, final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last, int distance)
+ public ClusterTopologyChangeMessage(final String nodeID, final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last)
{
super(PacketImpl.CLUSTER_TOPOLOGY);
@@ -56,8 +54,6 @@
this.last = last;
this.exit = false;
-
- this.distance = distance;
}
public ClusterTopologyChangeMessage(final String nodeID)
@@ -96,16 +92,6 @@
return exit;
}
- public int getDistance()
- {
- return distance;
- }
-
- public void setDistance(int distance)
- {
- this.distance = distance;
- }
-
@Override
public void encodeRest(final HornetQBuffer buffer)
{
@@ -132,7 +118,6 @@
buffer.writeBoolean(false);
}
buffer.writeBoolean(last);
- buffer.writeInt(distance);
}
}
@@ -167,7 +152,6 @@
}
pair = new Pair<TransportConfiguration, TransportConfiguration>(a, b);
last = buffer.readBoolean();
- distance = buffer.readInt();
}
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -50,7 +50,7 @@
void notifyNodeDown(String nodeID);
- void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup, int distance);
+ void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup);
Topology getTopology();
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -29,6 +29,7 @@
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -83,6 +84,8 @@
private final Map<String, MessageFlowRecord> records = new HashMap<String, MessageFlowRecord>();
+ private final List<TransportConfiguration> conectorssss = new ArrayList<TransportConfiguration>();
+
private final ScheduledExecutorService scheduledExecutor;
private final int maxHops;
@@ -413,8 +416,7 @@
public synchronized void nodeUP(final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final boolean last,
- final int distance)
+ final boolean last)
{
// discard notifications about ourselves unless its from our backup
@@ -422,16 +424,16 @@
{
if(connectorPair.b != null)
{
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, distance);
+ server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last);
}
return;
}
// we propagate the node notifications to all cluster topology listeners
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, distance);
+ server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last);
// if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
- if (allowDirectConnectionsOnly && distance > 1 && !allowableConnections.contains(connectorPair.a))
+ if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
{
return;
}
@@ -448,6 +450,18 @@
return;
}
+ Collection<TopologyMember> topologyMembers = serverLocator.getTopology().getMembers();
+ for (TopologyMember topologyMember : topologyMembers)
+ {
+ if(topologyMember.getConnector().a != null && !conectorssss.contains(topologyMember.getConnector().a))
+ {
+ if(!topologyMember.getConnector().a.equals(connector) && !topologyMember.getConnector().a.equals(connectorPair.a))
+ {
+ System.out.println("ClusterConnectionImpl.nodeUP");
+ }
+ }
+ }
+
try
{
MessageFlowRecord record = records.get(nodeID);
@@ -474,6 +488,7 @@
}
createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
+ conectorssss.add(connectorPair.a);
}
else
{
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -253,10 +253,9 @@
public void notifyNodeUp(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last,
- int distance)
+ boolean last)
{
- boolean updated = topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
+ boolean updated = topology.addMember(nodeID, new TopologyMember(connectorPair));
if(!updated)
{
@@ -265,15 +264,13 @@
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, connectorPair, last, distance);
+ listener.nodeUP(nodeID, connectorPair, last);
}
- if (distance < topology.nodes())
+
+ for (ClusterTopologyListener listener : clusterConnectionListeners)
{
- for (ClusterTopologyListener listener : clusterConnectionListeners)
- {
- listener.nodeUP(nodeID, connectorPair, last, distance);
- }
+ listener.nodeUP(nodeID, connectorPair, last);
}
}
@@ -407,12 +404,12 @@
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, member.getConnector(), false);
}
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
- listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, member.getConnector(), false);
}
}
}
@@ -455,11 +452,11 @@
{
if (backup)
{
- member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(null, cc.getConnector()), 0);
+ member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(null, cc.getConnector()));
}
else
{
- member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(cc.getConnector(), null), 0);
+ member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(cc.getConnector(), null));
}
topology.addMember(nodeID, member);
@@ -480,12 +477,12 @@
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, member.getConnector(), false);
}
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
- listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, member.getConnector(), false);
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -149,7 +149,7 @@
setupCluster();
startServers(5, 0);
-
+ servers[0].getClusterManager().getTopology().setDebug(true);
setupSessionFactory(0, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, false);
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -398,7 +398,7 @@
this.latch = latch;
}
- public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
+ public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
{
if(connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
{
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -215,8 +215,7 @@
public void nodeUP(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last,
- int distance)
+ boolean last)
{
if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
{
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -203,8 +203,7 @@
{
public void nodeUP(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last,
- int distance)
+ boolean last)
{
if(!nodes.contains(nodeID))
{
@@ -264,8 +263,7 @@
{
public void nodeUP(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last,
- int distance)
+ boolean last)
{
if (!nodes.contains(nodeID))
{
@@ -337,8 +335,7 @@
{
public void nodeUP(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last,
- int distance)
+ boolean last)
{
if (!nodes.contains(nodeID))
{
@@ -420,8 +417,7 @@
{
public void nodeUP(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last,
- int distance)
+ boolean last)
{
if (!nodes.contains(nodeID))
{
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -298,8 +298,7 @@
public void nodeUP(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last,
- int distance)
+ boolean last)
{
if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
{
15 years, 5 months
JBoss hornetq SVN: r10063 - in trunk: src/main/org/hornetq/jms/client and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-20 20:50:48 -0500 (Mon, 20 Dec 2010)
New Revision: 10063
Modified:
trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
Log:
fixing tests on Large Message
Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-12-20 15:10:20 UTC (rev 10062)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-12-21 01:50:48 UTC (rev 10063)
@@ -1730,6 +1730,13 @@
try
{
msg = sourceConsumer.receive(1000);
+
+ if (msg instanceof HornetQMessage)
+ {
+ // We need to check the buffer mainly in the case of LargeMessages
+ // As we need to reconstruct the buffer before resending the message
+ ((HornetQMessage)msg).checkBuffer();
+ }
}
catch (JMSException jmse)
{
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2010-12-20 15:10:20 UTC (rev 10062)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2010-12-21 01:50:48 UTC (rev 10063)
@@ -879,6 +879,11 @@
{
message.getBodyBuffer().resetReaderIndex();
}
+
+ public void checkBuffer()
+ {
+ message.getBodyBuffer();
+ }
public void doBeforeReceive() throws Exception
{
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2010-12-20 15:10:20 UTC (rev 10062)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2010-12-21 01:50:48 UTC (rev 10063)
@@ -27,6 +27,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.tests.util.RandomUtil;
/**
* A LargeMessageCompressTest
@@ -277,6 +278,111 @@
}
+
+ public void testLargeMessageCompressionRestartAndCheckSize() throws Exception
+ {
+ final int messageSize = 1024 * 1024;
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ byte [] msgs = new byte[1024 * 1024];
+ for (int i = 0 ; i < msgs.length; i++)
+ {
+ msgs[i] = RandomUtil.randomByte();
+ }
+
+ Message clientFile = createLargeClientMessage(session, msgs, true);
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.close();
+
+ sf.close();
+
+ locator.close();
+
+ server.stop();
+
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ locator = createFactory(isNetty());
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ ClientMessage msg1 = consumer.receive(1000);
+ Assert.assertNotNull(msg1);
+
+ assertEquals(messageSize, msg1.getBodySize());
+
+ String testDir = this.getTestDir();
+ File testFile = new File(testDir, "async_large_message");
+ FileOutputStream output = new FileOutputStream(testFile);
+
+ msg1.saveToOutputStream(output);
+
+ msg1.acknowledge();
+
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+
+ //verify
+ FileInputStream input = new FileInputStream(testFile);
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ byte b = (byte)input.read();
+ assertEquals("position = " + i, msgs[i], b);
+ }
+
+ testFile.delete();
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+
public void testSendServerMessage() throws Exception
{
// doesn't make sense as compressed
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java 2010-12-20 15:10:20 UTC (rev 10062)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java 2010-12-21 01:50:48 UTC (rev 10063)
@@ -100,7 +100,7 @@
public long countMessages(final String filter) throws Exception
{
- return (Long)proxy.invokeOperation("countMessages", filter);
+ return ((Number)proxy.invokeOperation("countMessages", filter)).intValue();
}
public boolean expireMessage(final String messageID) throws Exception
Modified: trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2010-12-20 15:10:20 UTC (rev 10062)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2010-12-21 01:50:48 UTC (rev 10063)
@@ -617,6 +617,13 @@
return createLargeClientMessage(session, numberOfBytes, true);
}
+ protected ClientMessage createLargeClientMessage (final ClientSession session, final byte[] buffer, final boolean durable) throws Exception
+ {
+ ClientMessage msgs = session.createMessage(durable);
+ msgs.getBodyBuffer().writeBytes(buffer);
+ return msgs;
+ }
+
protected ClientMessage createLargeClientMessage(final ClientSession session,
final long numberOfBytes,
final boolean persistent) throws Exception
15 years, 5 months
JBoss hornetq SVN: r10062 - trunk.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-12-20 10:10:20 -0500 (Mon, 20 Dec 2010)
New Revision: 10062
Modified:
trunk/build-hornetq.xml
Log:
fixing rest-easy build
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2010-12-20 14:50:41 UTC (rev 10061)
+++ trunk/build-hornetq.xml 2010-12-20 15:10:20 UTC (rev 10062)
@@ -1190,9 +1190,11 @@
<target name="jar-rest-init" depends="jar-jms">
<ant antfile="build-maven.xml" target="upload-local-target">
<property name="artifact.id" value="hornetq-core"/>
+ <property name="file-name" value="hornetq-core"/>
</ant>
<ant antfile="build-maven.xml" target="upload-local-target">
<property name="artifact.id" value="hornetq-jms"/>
+ <property name="file-name" value="hornetq-jms"/>
</ant>
</target>
15 years, 5 months