JBoss hornetq SVN: r9773 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-12 03:46:46 -0400 (Tue, 12 Oct 2010)
New Revision: 9773
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
Log:
ping test fix
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/PingTest.java 2010-10-12 07:30:09 UTC (rev 9772)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/PingTest.java 2010-10-12 07:46:46 UTC (rev 9773)
@@ -235,7 +235,7 @@
// We need to get it to stop pinging after one
- //((FailoverManagerImpl)csf.getFailoverManagers()[0]).stopPingingAfterOne();
+ csf.stopPingingAfterOne();
RemotingConnection serverConn = null;
13 years, 6 months
JBoss hornetq SVN: r9772 - in branches/2_2_0_HA_Improvements: tests/src/org/hornetq/tests/integration/remoting and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-12 03:30:09 -0400 (Tue, 12 Oct 2010)
New Revision: 9772
Modified:
branches/2_2_0_HA_Improvements/build-hornetq.xml
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java
Log:
couple of test fixes
Modified: branches/2_2_0_HA_Improvements/build-hornetq.xml
===================================================================
--- branches/2_2_0_HA_Improvements/build-hornetq.xml 2010-10-11 23:37:26 UTC (rev 9771)
+++ branches/2_2_0_HA_Improvements/build-hornetq.xml 2010-10-12 07:30:09 UTC (rev 9772)
@@ -1387,6 +1387,7 @@
<exclude name="**/cluster/failover/*SharedStoreDistributionTest.class"/>
<exclude name="**/cluster/failover/*ReplicatedNettyAsynchronousFailoverTest.class"/>
<exclude name="**/cluster/*ReplicatedJMSFailoverTest.class"/>
+ <exclude name="**/integration/replication/*.class"/>
<exclude name="**/cluster/failover/Remote*.class"/>
<include name="${tests.param}"/>
</fileset>
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java 2010-10-11 23:37:26 UTC (rev 9771)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java 2010-10-12 07:30:09 UTC (rev 9772)
@@ -102,7 +102,6 @@
});
- locator.close();
server.stop();
Thread.sleep((pingPeriod * 2));
@@ -116,6 +115,7 @@
Assert.assertEquals(1, count.get());
+ locator.close();
}
finally
{
13 years, 6 months
JBoss hornetq SVN: r9771 - in branches/Branch_New_Paging: tests/src/org/hornetq/tests/integration/paging and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-11 19:37:26 -0400 (Mon, 11 Oct 2010)
New Revision: 9771
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Transactions on cursors acks
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-11 21:40:49 UTC (rev 9770)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-11 23:37:26 UTC (rev 9771)
@@ -17,16 +17,15 @@
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
+import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.Pair;
-import org.hornetq.core.paging.Page;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursor;
@@ -126,7 +125,7 @@
if (!match)
{
- confirmPagePosition(message.a);
+ processACK(message.a);
}
}
@@ -142,6 +141,18 @@
public void ack(final PagePosition position) throws Exception
{
store.storeCursorAcknowledge(cursorId, position);
+ store.afterCompleteOperations(new IOAsyncTask()
+ {
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ processACK(position);
+ }
+ });
}
public void ackTx(final Transaction tx, final PagePosition position) throws Exception
@@ -173,7 +184,7 @@
*/
public void reloadPreparedACK(final Transaction tx, final PagePosition position)
{
- internalAdd(position);
+ // internalAdd(position);
installTXCallback(tx, position);
}
@@ -189,8 +200,7 @@
{
PageCursorInfo positions = getPageInfo(pos);
- positions.confirmed.incrementAndGet();
- positions.acks.add(pos);
+ positions.addACK(pos);
lastPosition = pos;
if (previousPos != null)
@@ -270,14 +280,13 @@
// Private -------------------------------------------------------
- private void confirmPagePosition(final PagePosition pos)
+ // To be called only after the ACK has been processed and guaranteed to be on storae
+ // The only exception is on non storage events such as not matching messages
+ private void processACK(final PagePosition pos)
{
PageCursorInfo info = getPageInfo(pos);
- if (info.confirmed.incrementAndGet() == info.getNumberOfMessages())
- {
- // todo delete previous destinations
- }
+ info.addACK(pos);
}
/**
@@ -311,13 +320,21 @@
tx.addOperation(cursorTX);
}
+ cursorTX.addPositionConfirmation(this, position);
+
}
+
+ // A callback from the PageCursorInfo. It will be called when all the messages on a page have been acked
+ private void onPageDone(PageCursorInfo info)
+ {
+ System.out.println("Page " + info.getPageId() + " has completed");
+ }
// Inner classes -------------------------------------------------
- private static class PageCursorInfo
+ private class PageCursorInfo
{
// Number of messages existent on this page
private final int numberOfMessages;
@@ -325,7 +342,7 @@
private final long pageId;
// Confirmed ACKs on this page
- private final List<PagePosition> acks = new LinkedList<PagePosition>();
+ private final List<PagePosition> acks = Collections.synchronizedList(new LinkedList<PagePosition>());
// We need a separate counter as the cursor may be ignoring certain values because of incomplete transactions or expressions
private final AtomicInteger confirmed = new AtomicInteger(0);
@@ -343,6 +360,11 @@
{
return numberOfMessages;
}
+
+ public boolean isDone()
+ {
+ return numberOfMessages == confirmed.get();
+ }
/**
* @return the pageId
@@ -354,7 +376,16 @@
public void addACK(final PagePosition posACK)
{
- this.acks.add(posACK);
+ if (posACK.getRecordID() > 0)
+ {
+ // We store these elements for later cleanup
+ this.acks.add(posACK);
+ }
+
+ if (numberOfMessages == confirmed.incrementAndGet())
+ {
+ PageCursorImpl.this.onPageDone(this);
+ }
}
}
@@ -410,7 +441,7 @@
for (PagePosition confirmed : positions)
{
- cursor.confirmPagePosition(confirmed);
+ cursor.processACK(confirmed);
}
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-11 21:40:49 UTC (rev 9770)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-11 23:37:26 UTC (rev 9771)
@@ -115,6 +115,7 @@
while ((msg = cursor.moveNext()) != null)
{
assertEquals(key++, msg.b.getIntProperty("key").intValue());
+ cursor.ack(msg.a);
}
assertEquals(NUM_MESSAGES, key);
@@ -270,21 +271,25 @@
server.start();
cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
for (int i = 10; i <= 20; i++)
{
Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
assertEquals(i, msg.b.getIntProperty("key").intValue());
- cursor.ack(msg.a);
+ cursor.ackTx(tx,msg.a);
}
for (int i = 100; i < NUM_MESSAGES; i++)
{
Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
assertEquals(i, msg.b.getIntProperty("key").intValue());
- cursor.ack(msg.a);
+ cursor.ackTx(tx,msg.a);
}
+ tx.commit();
+
}
13 years, 6 months
JBoss hornetq SVN: r9770 - branches/2_2_0_HA_Improvements.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-11 17:40:49 -0400 (Mon, 11 Oct 2010)
New Revision: 9770
Modified:
branches/2_2_0_HA_Improvements/build-hornetq.xml
Log:
commented out jms replicated test
Modified: branches/2_2_0_HA_Improvements/build-hornetq.xml
===================================================================
--- branches/2_2_0_HA_Improvements/build-hornetq.xml 2010-10-11 21:16:17 UTC (rev 9769)
+++ branches/2_2_0_HA_Improvements/build-hornetq.xml 2010-10-11 21:40:49 UTC (rev 9770)
@@ -1386,7 +1386,7 @@
<exclude name="**/cluster/failover/*ReplicatedDistributionTest.class"/>
<exclude name="**/cluster/failover/*SharedStoreDistributionTest.class"/>
<exclude name="**/cluster/failover/*ReplicatedNettyAsynchronousFailoverTest.class"/>
- <exclude name="**/cluster/failover/*ReplicatedJMSFailoverTest.class"/>
+ <exclude name="**/cluster/*ReplicatedJMSFailoverTest.class"/>
<exclude name="**/cluster/failover/Remote*.class"/>
<include name="${tests.param}"/>
</fileset>
13 years, 6 months
JBoss hornetq SVN: r9769 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 6 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-11 17:16:17 -0400 (Mon, 11 Oct 2010)
New Revision: 9769
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
just backing up the current state of my work
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-11 21:16:17 UTC (rev 9769)
@@ -37,6 +37,9 @@
SimpleString getAddress();
int getNumberOfPages();
+
+ // The current page in which the system is writing files
+ int getCurrentWritingPage();
SimpleString getStoreName();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java 2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java 2010-10-11 21:16:17 UTC (rev 9769)
@@ -28,6 +28,8 @@
Page getPage();
int getNumberOfMessages();
+
+ void setMessages(ServerMessage[] messages);
/**
*
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-11 21:16:17 UTC (rev 9769)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.cursor;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.server.ServerMessage;
@@ -37,7 +38,7 @@
// Public --------------------------------------------------------
- PageCache getPageCache(long pageId) throws Exception;
+ PageCache getPageCache(PagePosition pos);
PagingStore getAssociatedStore();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java 2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java 2010-10-11 21:16:17 UTC (rev 9769)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.cursor;
+
/**
* A PagePosition
*
@@ -33,6 +34,15 @@
long getPageNr();
int getMessageNr();
+
+ void setPageCache(PageCache pageCache);
+
+ /**
+ * PagePosition will hold the page with a weak reference.
+ * So, this could be eventually null case soft-cache was released
+ * @return
+ */
+ PageCache getPageCache();
PagePosition nextMessage();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-11 21:16:17 UTC (rev 9769)
@@ -14,20 +14,29 @@
package org.hornetq.core.paging.cursor.impl;
import java.util.Collections;
-import java.util.Deque;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursor;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionOperation;
+import org.hornetq.core.transaction.TransactionPropertyIndexes;
/**
* A PageCursorImpl
@@ -50,11 +59,15 @@
private final PagingStore pageStore;
private final PageCursorProvider cursorProvider;
+
+ private final Executor executor;
private volatile PagePosition lastPosition;
private List<PagePosition> recoveredACK;
+ private SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
+
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
@@ -65,12 +78,14 @@
public PageCursorImpl(final PageCursorProvider cursorProvider,
final PagingStore pageStore,
final StorageManager store,
+ final Executor executor,
final long cursorId)
{
this.pageStore = pageStore;
this.store = store;
this.cursorProvider = cursorProvider;
this.cursorId = cursorId;
+ this.executor = executor;
}
// Public --------------------------------------------------------
@@ -98,19 +113,23 @@
boolean match = false;
Pair<PagePosition, ServerMessage> message = null;
+
do
{
message = cursorProvider.getAfter(lastPosition);
+
if (message != null)
{
lastPosition = message.a;
- }
- match = match(message.b);
- if (!match)
- {
- ignored(message.a);
+ match = match(message.b);
+
+ if (!match)
+ {
+ confirmPagePosition(message.a);
+ }
}
+
}
while (message != null && !match);
@@ -130,11 +149,6 @@
store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
installTXCallback(tx, position);
- // It needs to persist, otherwise the cursor will return to the fist page position
- tx.setContainsPersistent();
-
-
- // tx.afterCommit()
}
/* (non-Javadoc)
@@ -173,6 +187,11 @@
PagePosition previousPos = null;
for (PagePosition pos : recoveredACK)
{
+ PageCursorInfo positions = getPageInfo(pos);
+
+ positions.confirmed.incrementAndGet();
+ positions.acks.add(pos);
+
lastPosition = pos;
if (previousPos != null)
{
@@ -183,6 +202,9 @@
while (true)
{
Pair<PagePosition, ServerMessage> msgCheck = cursorProvider.getAfter(tmpPos);
+
+ positions = getPageInfo(tmpPos);
+
// end of the hole, we can finish processing here
// It may be also that the next was just a next page, so we just ignore it
if (msgCheck == null || msgCheck.a.equals(pos))
@@ -195,6 +217,12 @@
{
redeliver(msgCheck.a);
}
+ else
+ {
+ // The reference was ignored. But we must take a count from the reference count
+ // otherwise the page will never be deleted hence we would never leave paging even if everything was consumed
+ positions.confirmed.incrementAndGet();
+ }
}
tmpPos = msgCheck.a;
}
@@ -210,9 +238,29 @@
}
}
+ /**
+ * @param page
+ * @return
+ */
+ private PageCursorInfo getPageInfo(PagePosition pos)
+ {
+ PageCursorInfo pageInfo = consumedPages.get(pos.getPageNr());
+
+ if (pageInfo == null)
+ {
+ PageCache cache = cursorProvider.getPageCache(pos);
+ pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages());
+ consumedPages.put(pos.getPageNr(), pageInfo);
+ }
+
+ return pageInfo;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+
+
protected boolean match(final ServerMessage message)
{
@@ -221,10 +269,15 @@
}
// Private -------------------------------------------------------
-
- private void ignored(final PagePosition message)
+
+ private void confirmPagePosition(final PagePosition pos)
{
- // TODO: Update reference counts
+ PageCursorInfo info = getPageInfo(pos);
+
+ if (info.confirmed.incrementAndGet() == info.getNumberOfMessages())
+ {
+ // todo delete previous destinations
+ }
}
/**
@@ -246,9 +299,136 @@
*/
private void installTXCallback(Transaction tx, PagePosition position)
{
- //TODO: Play with rollbacks on the reference counts
+ // It needs to persist, otherwise the cursor will return to the fist page position
+ tx.setContainsPersistent();
+
+ PageCursorTX cursorTX = (PageCursorTX)tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
+
+ if (cursorTX == null)
+ {
+ cursorTX = new PageCursorTX();
+ tx.putProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS,cursorTX);
+ tx.addOperation(cursorTX);
+ }
+
+
}
// Inner classes -------------------------------------------------
+
+
+ private static class PageCursorInfo
+ {
+ // Number of messages existent on this page
+ private final int numberOfMessages;
+
+ private final long pageId;
+
+ // Confirmed ACKs on this page
+ private final List<PagePosition> acks = new LinkedList<PagePosition>();
+
+ // We need a separate counter as the cursor may be ignoring certain values because of incomplete transactions or expressions
+ private final AtomicInteger confirmed = new AtomicInteger(0);
+
+ public PageCursorInfo(final long pageId, final int numberOfMessages)
+ {
+ this.pageId = pageId;
+ this.numberOfMessages = numberOfMessages;
+ }
+ /**
+ * @return the numberOfMessages
+ */
+ public int getNumberOfMessages()
+ {
+ return numberOfMessages;
+ }
+
+ /**
+ * @return the pageId
+ */
+ public long getPageId()
+ {
+ return pageId;
+ }
+
+ public void addACK(final PagePosition posACK)
+ {
+ this.acks.add(posACK);
+ }
+
+ }
+
+ static class PageCursorTX implements TransactionOperation
+ {
+ HashMap<PageCursorImpl, List<PagePosition>> pendingPositions = new HashMap<PageCursorImpl, List<PagePosition>>();
+
+ public void addPositionConfirmation(PageCursorImpl cursor, PagePosition position)
+ {
+ List<PagePosition> list = pendingPositions.get(cursor);
+
+ if (list == null)
+ {
+ list = new LinkedList<PagePosition>();
+ pendingPositions.put(cursor, list);
+ }
+
+ list.add(position);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforePrepare(Transaction tx) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#afterPrepare(org.hornetq.core.transaction.Transaction)
+ */
+ public void afterPrepare(Transaction tx)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforeCommit(Transaction tx) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
+ */
+ public void afterCommit(Transaction tx)
+ {
+ for (Entry<PageCursorImpl, List<PagePosition>> entry : this.pendingPositions.entrySet())
+ {
+ PageCursorImpl cursor = entry.getKey();
+
+ List<PagePosition> positions = entry.getValue();
+
+ for (PagePosition confirmed : positions)
+ {
+ cursor.confirmPagePosition(confirmed);
+ }
+
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforeRollback(Transaction tx) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
+ */
+ public void afterRollback(Transaction tx)
+ {
+ }
+ }
+
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-11 21:16:17 UTC (rev 9769)
@@ -26,6 +26,7 @@
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.SoftValueHashMap;
import org.jboss.netty.util.internal.ConcurrentHashMap;
@@ -46,21 +47,26 @@
// Attributes ----------------------------------------------------
private final PagingStore pagingStore;
-
+
private final StorageManager storageManager;
-
- private SoftValueHashMap<Long, PageCacheImpl> softCache = new SoftValueHashMap<Long, PageCacheImpl>();
-
+
+ private final ExecutorFactory executorFactory;
+
+ private SoftValueHashMap<Long, PageCache> softCache = new SoftValueHashMap<Long, PageCache>();
+
private ConcurrentMap<Long, PageCursor> activeCursors = new ConcurrentHashMap<Long, PageCursor>();
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public PageCursorProviderImpl(final PagingStore pagingStore, final StorageManager storageManager)
+ public PageCursorProviderImpl(final PagingStore pagingStore,
+ final StorageManager storageManager,
+ final ExecutorFactory executorFactory)
{
this.pagingStore = pagingStore;
this.storageManager = storageManager;
+ this.executorFactory = executorFactory;
}
// Public --------------------------------------------------------
@@ -78,20 +84,23 @@
PageCursor activeCursor = activeCursors.get(cursorID);
if (activeCursor == null)
{
- activeCursor = new PageCursorImpl(this, pagingStore, storageManager, cursorID);
+ activeCursor = new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), cursorID);
PageCursor previousValue = activeCursors.putIfAbsent(cursorID, activeCursor);
if (previousValue != null)
{
activeCursor = previousValue;
}
}
-
+
return activeCursor;
}
-
+
+ /**
+ * this will create a non-persistent cursor
+ */
public PageCursor createCursor()
{
- return new PageCursorImpl(this, pagingStore, storageManager, 0);
+ return new PageCursorImpl(this, pagingStore, storageManager, executorFactory.getExecutor(), 0);
}
/* (non-Javadoc)
@@ -100,100 +109,55 @@
public Pair<PagePosition, ServerMessage> getAfter(final PagePosition pos) throws Exception
{
// TODO: consider page transactions here to avoid receiving an uncommitted message
- // TODO: consider the case where a page came empty because of an ignored PageTX
+ // TODO: consider the case where a full page is ignored because of a TX
PagePosition retPos = pos.nextMessage();
-
- PageCache cache = getPageCache(pos.getPageNr());
-
+
+ PageCache cache = getPageCache(pos);
+
if (retPos.getMessageNr() >= cache.getNumberOfMessages())
{
retPos = pos.nextPage();
-
- cache = getPageCache(retPos.getPageNr());
+
+ cache = getPageCache(retPos);
+
if (cache == null)
{
return null;
}
-
+
if (retPos.getMessageNr() >= cache.getNumberOfMessages())
{
return null;
}
}
-
+
return new Pair<PagePosition, ServerMessage>(retPos, cache.getMessage(retPos.getMessageNr()));
}
-
+
public ServerMessage getMessage(final PagePosition pos) throws Exception
{
- PageCache cache = getPageCache(pos.getPageNr());
-
+ PageCache cache = getPageCache(pos);
+
if (pos.getMessageNr() >= cache.getNumberOfMessages())
{
// sanity check, this should never happen unless there's a bug
throw new IllegalStateException("Invalid messageNumber passed = " + pos);
}
-
+
return cache.getMessage(pos.getMessageNr());
}
- public PageCache getPageCache(final long pageId) throws Exception
+ public PageCache getPageCache(PagePosition pos)
{
- boolean needToRead = false;
- PageCacheImpl cache = null;
- synchronized (this)
+ PageCache cache = pos.getPageCache();
+ if (cache == null)
{
- if (pageId > pagingStore.getNumberOfPages())
- {
- return null;
- }
-
- cache = softCache.get(pageId);
- if (cache == null)
- {
- cache = createPageCache(pageId);
- needToRead = true;
- // anyone reading from this cache will have to wait reading to finish first
- // we also want only one thread reading this cache
- cache.lock();
- softCache.put(pageId, cache);
- }
+ cache = getPageCache(pos.getPageNr());
+ pos.setPageCache(cache);
}
-
- // Reading is done outside of the synchronized block, however
- // the page stays locked until the entire reading is finished
- if (needToRead)
- {
- try
- {
- Page page = pagingStore.createPage((int)pageId);
-
- page.open();
-
- List<PagedMessage> pgdMessages = page.read();
-
- ServerMessage srvMessages[] = new ServerMessage[pgdMessages.size()];
-
- int i = 0;
- for (PagedMessage pdgMessage : pgdMessages)
- {
- ServerMessage message = pdgMessage.getMessage(storageManager);
- srvMessages[i++] = message;
- }
-
- cache.setMessages(srvMessages);
-
- }
- finally
- {
- cache.unlock();
- }
- }
-
-
return cache;
}
-
+
public int getCacheSize()
{
return softCache.size();
@@ -206,7 +170,7 @@
cursor.processReload();
}
}
-
+
public void stop()
{
activeCursors.clear();
@@ -215,7 +179,7 @@
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
+
protected PageCacheImpl createPageCache(final long pageId) throws Exception
{
return new PageCacheImpl(pagingStore.createPage((int)pageId));
@@ -223,6 +187,69 @@
// Private -------------------------------------------------------
+ private PageCache getPageCache(final long pageId)
+ {
+ try
+ {
+ boolean needToRead = false;
+ PageCache cache = null;
+ synchronized (this)
+ {
+ if (pageId > pagingStore.getCurrentWritingPage())
+ {
+ return null;
+ }
+
+ cache = softCache.get(pageId);
+ if (cache == null)
+ {
+ cache = createPageCache(pageId);
+ needToRead = true;
+ // anyone reading from this cache will have to wait reading to finish first
+ // we also want only one thread reading this cache
+ cache.lock();
+ softCache.put(pageId, cache);
+ }
+ }
+
+ // Reading is done outside of the synchronized block, however
+ // the page stays locked until the entire reading is finished
+ if (needToRead)
+ {
+ try
+ {
+ Page page = pagingStore.createPage((int)pageId);
+
+ page.open();
+
+ List<PagedMessage> pgdMessages = page.read();
+
+ ServerMessage srvMessages[] = new ServerMessage[pgdMessages.size()];
+
+ int i = 0;
+ for (PagedMessage pdgMessage : pgdMessages)
+ {
+ ServerMessage message = pdgMessage.getMessage(storageManager);
+ srvMessages[i++] = message;
+ }
+
+ cache.setMessages(srvMessages);
+
+ }
+ finally
+ {
+ cache.unlock();
+ }
+ }
+
+ return cache;
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Couldn't complete paging due to an IO Exception on Paging - " + e.getMessage(), e);
+ }
+ }
+
// Inner classes -------------------------------------------------
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-11 21:16:17 UTC (rev 9769)
@@ -13,6 +13,10 @@
package org.hornetq.core.paging.cursor.impl;
+import java.lang.ref.WeakReference;
+
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PagePosition;
/**
@@ -30,6 +34,8 @@
/** ID used for storage */
private long recordID;
+
+ private volatile WeakReference<PageCache> cacheReference;
/**
* @param pageNr
@@ -42,6 +48,12 @@
this.messageNr = messageNr;
}
+ public PagePositionImpl(long pageNr, int messageNr, PageCache pageCache)
+ {
+ this(pageNr, messageNr);
+ this.setPageCache(pageCache);
+ }
+
/**
* @param pageNr
* @param messageNr
@@ -52,6 +64,31 @@
}
/**
+ * The cached page associaed with this position
+ * @return
+ */
+ public PageCache getPageCache()
+ {
+ if (cacheReference == null)
+ {
+ return null;
+ }
+ else
+ {
+ return cacheReference.get();
+ }
+ }
+
+ public void setPageCache(final PageCache cache)
+ {
+ if (cache != null)
+ {
+ this.cacheReference = new WeakReference<PageCache>(cache);
+ }
+ }
+
+
+ /**
* @return the recordID
*/
public long getRecordID()
@@ -117,7 +154,7 @@
public PagePosition nextMessage()
{
- return new PagePositionImpl(this.pageNr, this.messageNr + 1);
+ return new PagePositionImpl(this.pageNr, this.messageNr + 1, this.getPageCache());
}
public PagePosition nextPage()
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-10-11 21:16:17 UTC (rev 9769)
@@ -13,7 +13,6 @@
package org.hornetq.core.paging.impl;
-import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -35,7 +34,7 @@
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
*/
-public class PageImpl implements Page
+public class PageImpl implements Page, Comparable<Page>
{
// Constants -----------------------------------------------------
@@ -241,13 +240,54 @@
{
return "PageImpl::pageID=" + this.pageId + ", file=" + this.file;
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Comparable#compareTo(java.lang.Object)
+ */
+ public int compareTo(Page otherPage)
+ {
+ return otherPage.getPageId() - this.pageId;
+ }
+
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + pageId;
+ return result;
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ PageImpl other = (PageImpl)obj;
+ if (pageId != other.pageId)
+ return false;
+ return true;
+ }
+
/**
* @param position
* @param msgNumber
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2010-10-11 21:16:17 UTC (rev 9769)
@@ -98,7 +98,7 @@
this,
address,
settings,
- executorFactory.getExecutor(),
+ executorFactory,
syncNonTransactional);
}
@@ -202,7 +202,7 @@
this,
address,
settings,
- executorFactory.getExecutor(),
+ executorFactory,
syncNonTransactional);
storesReturn.add(store);
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-11 21:16:17 UTC (rev 9769)
@@ -53,6 +53,7 @@
import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.utils.ExecutorFactory;
/**
*
@@ -150,7 +151,7 @@
final PagingStoreFactory storeFactory,
final SimpleString storeName,
final AddressSettings addressSettings,
- final Executor executor,
+ final ExecutorFactory executorFactory,
final boolean syncNonTransactional)
{
if (pagingManager == null)
@@ -182,7 +183,7 @@
pageSize);
}
- this.executor = executor;
+ this.executor = executorFactory.getExecutor();
this.pagingManager = pagingManager;
@@ -192,7 +193,7 @@
this.syncNonTransactional = syncNonTransactional;
- this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager);
+ this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executorFactory);
// Post office could be null on the backup node
if (postOffice == null)
@@ -279,6 +280,11 @@
{
return numberOfPages;
}
+
+ public int getCurrentWritingPage()
+ {
+ return currentPageId;
+ }
public SimpleString getStoreName()
{
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2010-10-11 21:16:17 UTC (rev 9769)
@@ -31,4 +31,6 @@
public static final int REFS_OPERATION = 6;
public static final int PAGE_MESSAGES_OPERATION = 7;
+
+ public static final int PAGE_CURSOR_POSITIONS = 8;
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-10-11 21:16:17 UTC (rev 9769)
@@ -999,7 +999,7 @@
final PagingStoreFactory storeFactory,
final SimpleString storeName,
final AddressSettings addressSettings,
- final Executor executor,
+ final ExecutorFactory executorFactory,
final boolean syncNonTransactional)
{
super(address,
@@ -1010,7 +1010,7 @@
storeFactory,
storeName,
addressSettings,
- executor,
+ executorFactory,
syncNonTransactional);
}
@@ -1073,7 +1073,7 @@
this,
address,
settings,
- getExecutorFactory().getExecutor(),
+ getExecutorFactory(),
syncNonTransactional);
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2010-10-11 21:16:17 UTC (rev 9769)
@@ -43,6 +43,7 @@
import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.OrderedExecutorFactory;
/**
@@ -292,7 +293,7 @@
factoryField.setAccessible(true);
OrderedExecutorFactory factory = (org.hornetq.utils.OrderedExecutorFactory)factoryField.get(this);
- return new FailingPagingStore(destinationName, settings, factory.getExecutor(), syncNonTransactional);
+ return new FailingPagingStore(destinationName, settings, factory, syncNonTransactional);
}
// Package protected ---------------------------------------------
@@ -312,7 +313,7 @@
*/
public FailingPagingStore(final SimpleString storeName,
final AddressSettings addressSettings,
- final Executor executor,
+ final ExecutorFactory executor,
final boolean syncNonTransactional)
{
super(storeName,
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-11 21:16:17 UTC (rev 9769)
@@ -26,6 +26,7 @@
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.server.HornetQServer;
@@ -78,11 +79,11 @@
System.out.println("NumberOfPages = " + numberOfPages);
- PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager());
+ PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(), server.getExecutorFactory());
for (int i = 0; i < numberOfPages; i++)
{
- PageCache cache = cursorProvider.getPageCache(i + 1);
+ PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(i + 1, 0));
System.out.println("Page " + i + " had " + cache.getNumberOfMessages() + " messages");
}
@@ -104,7 +105,7 @@
System.out.println("NumberOfPages = " + numberOfPages);
- PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager());
+ PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(), server.getExecutorFactory());
PageCursor cursor = cursorProvider.createCursor();
@@ -134,9 +135,9 @@
System.out.println("NumberOfPages = " + numberOfPages);
- PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager());
+ PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS), server.getStorageManager(), server.getExecutorFactory());
- PageCache cache = cursorProvider.getPageCache(2);
+ PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(2,0));
assertNull(cache);
}
@@ -287,11 +288,16 @@
}
- public void testRollbackScenarios() throws Exception
+ public void testRollbackScenariosOnACK() throws Exception
{
}
+ public void testReadRolledBackData() throws Exception
+ {
+
+ }
+
public void testPrepareScenarios() throws Exception
{
@@ -316,6 +322,11 @@
{
}
+
+ public void testFirstMessageInTheMiddle() throws Exception
+ {
+
+ }
/**
* @param numMessages
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-11 13:42:51 UTC (rev 9768)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-11 21:16:17 UTC (rev 9769)
@@ -73,6 +73,7 @@
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.UUID;
/**
@@ -143,7 +144,7 @@
null,
PagingStoreImplTest.destinationTestName,
addressSettings,
- executor,
+ getExecutorFactory(),
true);
storeImpl.start();
@@ -179,7 +180,7 @@
storeFactory,
PagingStoreImplTest.destinationTestName,
addressSettings,
- executor,
+ getExecutorFactory(),
true);
storeImpl.start();
@@ -215,7 +216,7 @@
null,
PagingStoreImplTest.destinationTestName,
addressSettings,
- executor,
+ getExecutorFactory(),
true);
storeImpl.start();
@@ -242,7 +243,7 @@
storeFactory,
PagingStoreImplTest.destinationTestName,
addressSettings,
- executor,
+ getExecutorFactory(),
true);
storeImpl.start();
@@ -317,7 +318,7 @@
storeFactory,
PagingStoreImplTest.destinationTestName,
addressSettings,
- executor,
+ getExecutorFactory(),
true);
storeImpl.start();
@@ -464,7 +465,7 @@
storeFactory,
new SimpleString("test"),
settings,
- executor,
+ getExecutorFactory(),
true);
storeImpl.start();
@@ -627,7 +628,7 @@
storeFactory,
new SimpleString("test"),
settings,
- executor,
+ getExecutorFactory(),
true);
storeImpl2.start();
@@ -710,7 +711,7 @@
storeFactory,
new SimpleString("test"),
settings,
- executor,
+ getExecutorFactory(),
true);
storeImpl.start();
@@ -854,6 +855,18 @@
{
return new FakePostOffice();
}
+
+ private ExecutorFactory getExecutorFactory()
+ {
+ return new ExecutorFactory()
+ {
+
+ public Executor getExecutor()
+ {
+ return executor;
+ }
+ };
+ }
private ServerMessage createMessage(final long id,
final PagingStore store,
13 years, 6 months
JBoss hornetq SVN: r9768 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-11 09:42:51 -0400 (Mon, 11 Oct 2010)
New Revision: 9768
Modified:
trunk/docs/user-manual/en/appserver-integration.xml
Log:
https://jira.jboss.org/browse/HORNETQ-462 - added to JCA section
Modified: trunk/docs/user-manual/en/appserver-integration.xml
===================================================================
--- trunk/docs/user-manual/en/appserver-integration.xml 2010-10-11 12:34:44 UTC (rev 9767)
+++ trunk/docs/user-manual/en/appserver-integration.xml 2010-10-11 13:42:51 UTC (rev 9768)
@@ -28,7 +28,7 @@
outflow of messages sent from other JEE components, e.g. EJBs and Servlets.</para>
<para>This section explains the basics behind configuring the different JEE components in the
AS.</para>
- <section>
+ <section id="configuring-mdbs">
<title>Configuring Message-Driven Beans</title>
<para>The delivery of messages to an MDB using HornetQ is configured on the JCA Adapter via
a configuration file <literal>ra.xml</literal> which can be found under the <literal
@@ -980,6 +980,202 @@
</section>
</section>
<section>
+ <title>Configuring the JBoss Application Server to connect to Remote HornetQ Server</title>
+ <para>This is a step by step guide on how to configure a JBoss application server that doesn't have HornetQ installed
+ to use a remote instance of HornetQ</para>
+ <section>
+ <title>Configuring Jboss 5</title>
+ <para>Firstly download and install JBoss AS 5 as per the JBoss installation guide and HornetQ as per the
+ HornetQ installation guide. After thatt he following steps are required</para>
+ <itemizedlist>
+ <listitem>
+ <para>Copy the following jars from the HornetQ distribution to the <literal>lib</literal> directory of
+ which ever JBossAs configuration you have chosen, i.e. <literal>default</literal>.</para>
+ <itemizedlist>
+ <listitem>
+ <para>hornetq-core-client.jar</para>
+ </listitem>
+ <listitem>
+ <para>hornetq-jms-client.jar</para>
+ </listitem>
+ <listitem>
+ <para>hornetq-ra.jar (this can be found inside the <literal>hornetq-ra.rar</literal> archive)</para>
+ </listitem>
+ <listitem>
+ <para>netty.jar</para>
+ </listitem>
+ </itemizedlist>
+ </listitem>
+ <listitem>
+ <para>create the directories <literal>hornetq-ra.rar</literal> and <literal>hornetq-ra.rar/META-INF</literal>
+ under the <literal>deploy</literal> directory in your JBoss config directory</para>
+ </listitem>
+ <listitem>
+ <para>under the <literal>hornetq-ra.rar/META-INF</literal> create a <literal>ra.xml</literal> file or
+ copy it from the HornetQ distribution (again it can be found in the <literal>hornetq-ra.rar</literal> archive)
+ and configure it as follows</para>
+ <programlisting>
+ <?xml version="1.0" encoding="UTF-8"?>
+
+ <connector xmlns="http://java.sun.com/xml/ns/j2ee"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee
+ http://java.sun.com/xml/ns/j2ee/connector_1_5.xsd"
+ version="1.5">
+
+ <description>HornetQ 2.0 Resource Adapter Alternate Configuration</description>
+ <display-name>HornetQ 2.0 Resource Adapter Alternate Configuration</display-name>
+
+ <vendor-name>Red Hat Middleware LLC</vendor-name>
+ <eis-type>JMS 1.1 Server</eis-type>
+ <resourceadapter-version>1.0</resourceadapter-version>
+
+ <license>
+ <description>
+ Copyright 2009 Red Hat, Inc.
+ Red Hat licenses this file to you under the Apache License, version
+ 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied. See the License for the specific language governing
+ permissions and limitations under the License.
+ </description>
+ <license-required>true</license-required>
+ </license>
+
+ <resourceadapter>
+ <resourceadapter-class>org.hornetq.ra.HornetQResourceAdapter</resourceadapter-class>
+ <config-property>
+ <description>The transport type</description>
+ <config-property-name>ConnectorClassName</config-property-name>
+ <config-property-type>java.lang.String</config-property-type>
+ <config-property-value>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</config-property-value>
+ </config-property>
+ <config-property>
+ <description>The transport configuration. These values must be in the form of key=val;key=val;</description>
+ <config-property-name>ConnectionParameters</config-property-name>
+ <config-property-type>java.lang.String</config-property-type>
+ <emphasis role="bold"> <config-property-value>host=127.0.0.1;port=5445</config-property-value></emphasis>
+ </config-property>
+
+ <outbound-resourceadapter>
+ <connection-definition>
+ <managedconnectionfactory-class>org.hornetq.ra.HornetQRAManagedConnectionFactory</managedconnectionfactory-class>
+
+ <config-property>
+ <description>The default session type</description>
+ <config-property-name>SessionDefaultType</config-property-name>
+ <config-property-type>java.lang.String</config-property-type>
+ <config-property-value>javax.jms.Queue</config-property-value>
+ </config-property>
+ <config-property>
+ <description>Try to obtain a lock within specified number of seconds; less than or equal to 0 disable this functionality</description>
+ <config-property-name>UseTryLock</config-property-name>
+ <config-property-type>java.lang.Integer</config-property-type>
+ <config-property-value>0</config-property-value>
+ </config-property>
+
+ <connectionfactory-interface>org.hornetq.ra.HornetQRAConnectionFactory</connectionfactory-interface>
+ <connectionfactory-impl-class>org.hornetq.ra.HornetQRAConnectionFactoryImpl</connectionfactory-impl-class>
+ <connection-interface>javax.jms.Session</connection-interface>
+ <connection-impl-class>org.hornetq.ra.HornetQRASession</connection-impl-class>
+ </connection-definition>
+ <transaction-support>XATransaction</transaction-support>
+ <authentication-mechanism>
+ <authentication-mechanism-type>BasicPassword</authentication-mechanism-type>
+ <credential-interface>javax.resource.spi.security.PasswordCredential</credential-interface>
+ </authentication-mechanism>
+ <reauthentication-support>false</reauthentication-support>
+ </outbound-resourceadapter>
+
+ <inbound-resourceadapter>
+ <messageadapter>
+ <messagelistener>
+ <messagelistener-type>javax.jms.MessageListener</messagelistener-type>
+ <activationspec>
+ <activationspec-class>org.hornetq.ra.inflow.HornetQActivationSpec</activationspec-class>
+ <required-config-property>
+ <config-property-name>destination</config-property-name>
+ </required-config-property>
+ </activationspec>
+ </messagelistener>
+ </messageadapter>
+ </inbound-resourceadapter>
+
+ </resourceadapter>
+ </connector>
+
+ </programlisting>
+ <para>The important part of this configuration is the part in bold, i.e. <config-property-value>host=127.0.0.1;port=5445</config-property-value>.
+ This should be configured to the host and port of the remote HornetQ server.</para>
+ </listitem>
+ </itemizedlist>
+ <para>At this point you should be able to now deploy MDB's that consume from the remote server. You will however,
+ have to make sure that your MDB's have the annotation <literal>@ResourceAdapter("hornetq-ra.rar")</literal>
+ added, this is illustrated in the <xref linkend="configuring-mdbs">Configuring Message-Driven Beans</xref> section.
+ If you don't want to add this annotation then you can delete the generic resource adapter <literal>jms-ra.rar</literal>
+ and rename the <literal>hornetq-ra.rar</literal> to this.</para>
+ <para>If you also want to use the remote HornetQ server for outgoing connections, i.e. sending messages, then
+ do the following:</para>
+ <itemizedlist>
+ <listitem>
+ <para>Create a file called <literal>hornetq-ds.xml</literal> in the <literal>deploy</literal> directory
+ (in fact you can call this anything you want as long as it ends in <literal>-ds.xml</literal>). Then
+ add the following:</para>
+ <programlisting>
+ <connection-factories>
+ <!--
+ JMS XA Resource adapter, use this for outbound JMS connections.
+ Inbound connections are defined at the @MDB activation or at the resource-adapter properties.
+ -->
+ <tx-connection-factory>
+ <jndi-name>RemoteJmsXA</jndi-name>
+ <xa-transaction/>
+ <rar-name>hornetq-ra.rar</rar-name>
+ <connection-definition>org.hornetq.ra.HornetQRAConnectionFactory</connection-definition>
+ <config-property name="SessionDefaultType" type="java.lang.String">javax.jms.Topic</config-property>
+ <config-property name="ConnectorClassName" type="java.lang.String">org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</config-property>
+ <config-property name="ConnectionParameters" type="java.lang.String">host=127.0.0.1;port=5445</config-property>
+ <max-pool-size>20</max-pool-size>
+ </tx-connection-factory>
+
+
+ </connection-factories>
+ </programlisting>
+ <para>Again you will see that the host and port are configured here to match the remote HornetQ servers
+ configuration. The other important attributes are:</para>
+ <itemizedlist>
+ <listitem>
+ <para>jndi-name - This is the name used to look up the JMS connection factory from within your JEE client</para>
+ </listitem>
+ <listitem>
+ <para>rar-name - This should match the directory that you created to hold the Resource Adapter
+ configuration</para>
+ </listitem>
+ </itemizedlist>
+ </listitem>
+ </itemizedlist>
+ <para>Now you should be able to send messages using the JCA JMS connection pooling within an XA transaction.</para>
+ </section>
+ <section>
+ <title>Configuring Jboss 5</title>
+ <para>The steps to do this are exactly the same as for JBoss 4, you will have to create a jboss.xml definition
+ file for your MDB with the following entry</para>
+ <programlisting>
+ <message-driven>
+ <ejb-name>MyMDB</ejb-name>
+ <resource-adapter-name>jms-ra.rar</resource-adapter-name>
+ </message-driven>
+ </programlisting>
+ <para>Also you will need to edit the <literal>standardjboss.xml</literal> and uncomment the section with the
+ following 'Uncomment to use JMS message inflow from jmsra.rar' and then comment out the invoker-proxy-binding
+ called 'message-driven-bean'</para>
+ </section>
+ </section>
+ <section>
<title>High Availability JNDI (HA-JNDI)</title>
<para>If you are using JNDI to look-up JMS queues, topics and connection factories from a
cluster of servers, it is likely you will want to use HA-JNDI so that your JNDI look-ups
13 years, 6 months
JBoss hornetq SVN: r9767 - in branches/2_2_0_HA_Improvements: tests/src/org/hornetq/tests/integration/cluster/distribution and 2 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-11 08:34:44 -0400 (Mon, 11 Oct 2010)
New Revision: 9767
Modified:
branches/2_2_0_HA_Improvements/build-hornetq.xml
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java
Log:
test fixes
Modified: branches/2_2_0_HA_Improvements/build-hornetq.xml
===================================================================
--- branches/2_2_0_HA_Improvements/build-hornetq.xml 2010-10-11 01:08:38 UTC (rev 9766)
+++ branches/2_2_0_HA_Improvements/build-hornetq.xml 2010-10-11 12:34:44 UTC (rev 9767)
@@ -1385,7 +1385,8 @@
<exclude name="**/cluster/replication/**.class"/>
<exclude name="**/cluster/failover/*ReplicatedDistributionTest.class"/>
<exclude name="**/cluster/failover/*SharedStoreDistributionTest.class"/>
- <exclude name="**/cluster/failover/*ReplicatedNettyAsynchronousFailoverTest.class"/>
+ <exclude name="**/cluster/failover/*ReplicatedNettyAsynchronousFailoverTest.class"/>
+ <exclude name="**/cluster/failover/*ReplicatedJMSFailoverTest.class"/>
<exclude name="**/cluster/failover/Remote*.class"/>
<include name="${tests.param}"/>
</fileset>
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-10-11 01:08:38 UTC (rev 9766)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-10-11 12:34:44 UTC (rev 9767)
@@ -1571,21 +1571,27 @@
for (int node : nodes)
{
//wait for each server to start, it may be a backup and started in a separate thread
- long timetowait =System.currentTimeMillis() + 5000;
- while(!servers[node].isStarted())
+ waitForServer(servers[node]);
+ }
+ }
+
+ private void waitForServer(HornetQServer server)
+ throws InterruptedException
+ {
+ long timetowait =System.currentTimeMillis() + 5000;
+ while(!server.isStarted())
+ {
+ Thread.sleep(100);
+ if(server.isStarted())
{
- Thread.sleep(100);
- if(servers[node].isStarted())
- {
- break;
- }
- else if(System.currentTimeMillis() > timetowait)
- {
- fail("server didnt start");
- }
+ break;
}
+ else if(System.currentTimeMillis() > timetowait)
+ {
+ fail("server didnt start");
+ }
}
- }
+ }
protected void stopClusterConnections(final int... nodes) throws Exception
{
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-10-11 01:08:38 UTC (rev 9766)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-10-11 12:34:44 UTC (rev 9767)
@@ -26,12 +26,23 @@
import junit.framework.Assert;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.util.RandomUtil;
+import java.io.File;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
/**
* A JMSUtil
*
@@ -170,6 +181,67 @@
}
}
+ public static void waitForServer(HornetQServer server)
+ throws InterruptedException
+ {
+ long timetowait =System.currentTimeMillis() + 5000;
+ while(!server.isStarted())
+ {
+ Thread.sleep(100);
+ if(server.isStarted())
+ {
+ break;
+ }
+ else if(System.currentTimeMillis() > timetowait)
+ {
+ throw new IllegalStateException("server didnt start");
+ }
+ }
+ }
+
+ public static void crash(HornetQServer server, ClientSession... sessions) throws Exception
+ {
+ final CountDownLatch latch = new CountDownLatch(sessions.length);
+
+ class MyListener implements SessionFailureListener
+ {
+ public void connectionFailed(final HornetQException me, boolean failedOver)
+ {
+ latch.countDown();
+ }
+
+ public void beforeReconnect(HornetQException exception)
+ {
+ System.out.println("MyListener.beforeReconnect");
+ }
+ }
+ for (ClientSession session : sessions)
+ {
+ session.addFailureListener(new MyListener());
+ }
+ Set<RemotingConnection> connections = server.getRemotingService().getConnections();
+ for (RemotingConnection remotingConnection : connections)
+ {
+ remotingConnection.destroy();
+ server.getRemotingService().removeConnection(remotingConnection.getID());
+ }
+
+ ClusterManagerImpl clusterManager = (ClusterManagerImpl) server.getClusterManager();
+ clusterManager.clear();
+ server.stop();
+ // recreate the live.lock file (since it was deleted by the
+ // clean stop
+ File lockFile = new File(server.getConfiguration().getJournalDirectory(), "live.lock");
+ Assert.assertFalse(lockFile.exists());
+ lockFile.createNewFile();
+
+
+ // Wait to be informed of failure
+ boolean ok = latch.await(10000, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+ }
+
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java 2010-10-11 01:08:38 UTC (rev 9766)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java 2010-10-11 12:34:44 UTC (rev 9767)
@@ -73,13 +73,13 @@
try
{
ServerLocator locator = createFactory(isNetty);
+ locator.setClientFailureCheckPeriod(pingPeriod);
+ locator.setRetryInterval(500);
+ locator.setRetryIntervalMultiplier(1d);
+ locator.setReconnectAttempts(-1);
+ locator.setConfirmationWindowSize(1024 * 1024);
ClientSessionFactory factory = locator.createSessionFactory();
- factory.getServerLocator().setClientFailureCheckPeriod(pingPeriod); // Using a smaller timeout
- factory.getServerLocator().setRetryInterval(500);
- factory.getServerLocator().setRetryIntervalMultiplier(1d);
- factory.getServerLocator().setReconnectAttempts(-1);
- factory.getServerLocator().setConfirmationWindowSize(1024 * 1024);
session = (ClientSessionInternal)factory.createSession();
13 years, 6 months
JBoss hornetq SVN: r9766 - in trunk/hornetq-rest/hornetq-rest/src: main/java/org/hornetq/rest/queue and 4 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-10-10 21:08:38 -0400 (Sun, 10 Oct 2010)
New Revision: 9766
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/HornetqBootstrapListener.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/RestMessagingBootstrapListener.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/ServletContextBindingRegistry.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/FilePushStore.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/UriStrategy.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/CustomHeaderLinkStrategy.java
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/TimeoutTask.java
trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/JMSTest.java
trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/RawRestartTest.java
trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/TransformTest.java
Log:
HORNETQ-515 -- adapt to new api, also remove @override annotations on interface method implementations
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/HornetqBootstrapListener.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/HornetqBootstrapListener.java 2010-10-11 01:04:43 UTC (rev 9765)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/HornetqBootstrapListener.java 2010-10-11 01:08:38 UTC (rev 9766)
@@ -14,7 +14,6 @@
{
private EmbeddedJMS jms;
- @Override
public void contextInitialized(ServletContextEvent contextEvent)
{
ServletContext context = contextEvent.getServletContext();
@@ -30,7 +29,6 @@
}
}
- @Override
public void contextDestroyed(ServletContextEvent servletContextEvent)
{
try
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/RestMessagingBootstrapListener.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/RestMessagingBootstrapListener.java 2010-10-11 01:04:43 UTC (rev 9765)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/RestMessagingBootstrapListener.java 2010-10-11 01:08:38 UTC (rev 9766)
@@ -16,7 +16,6 @@
{
MessageServiceManager manager;
- @Override
public void contextInitialized(ServletContextEvent contextEvent)
{
ServletContext context = contextEvent.getServletContext();
@@ -44,7 +43,6 @@
}
}
- @Override
public void contextDestroyed(ServletContextEvent servletContextEvent)
{
if (manager != null)
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/ServletContextBindingRegistry.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/ServletContextBindingRegistry.java 2010-10-11 01:04:43 UTC (rev 9765)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/integration/ServletContextBindingRegistry.java 2010-10-11 01:08:38 UTC (rev 9766)
@@ -17,37 +17,31 @@
this.servletContext = servletContext;
}
- @Override
public Object lookup(String name)
{
return servletContext.getAttribute(name);
}
- @Override
public boolean bind(String name, Object obj)
{
servletContext.setAttribute(name, obj);
return true;
}
- @Override
public void unbind(String name)
{
servletContext.removeAttribute(name);
}
- @Override
public void close()
{
}
- @Override
public Object getContext()
{
return servletContext;
}
- @Override
public void setContext(Object o)
{
servletContext = (ServletContext)o;
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java 2010-10-11 01:04:43 UTC (rev 9765)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java 2010-10-11 01:08:38 UTC (rev 9766)
@@ -78,7 +78,6 @@
private Object timeoutLock = new Object();
- @Override
public void testTimeout(String target)
{
synchronized (timeoutLock)
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/FilePushStore.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/FilePushStore.java 2010-10-11 01:04:43 UTC (rev 9765)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/FilePushStore.java 2010-10-11 01:08:38 UTC (rev 9766)
@@ -57,7 +57,6 @@
return list;
}
- @Override
public synchronized List<PushRegistration> getByDestination(String destination)
{
List<PushRegistration> list = new ArrayList<PushRegistration>();
@@ -71,7 +70,6 @@
return list;
}
- @Override
public synchronized void update(PushRegistration reg) throws Exception
{
if (reg.getLoadedFrom() == null) return;
@@ -86,7 +84,6 @@
marshaller.marshal(reg, (File) reg.getLoadedFrom());
}
- @Override
public synchronized void add(PushRegistration reg) throws Exception
{
map.put(reg.getId(), reg);
@@ -97,7 +94,6 @@
save(reg);
}
- @Override
public synchronized void remove(PushRegistration reg) throws Exception
{
map.remove(reg.getId());
@@ -106,7 +102,6 @@
fp.delete();
}
- @Override
public synchronized void removeAll() throws Exception
{
ArrayList<PushRegistration> copy = new ArrayList<PushRegistration>();
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java 2010-10-11 01:04:43 UTC (rev 9765)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java 2010-10-11 01:08:38 UTC (rev 9766)
@@ -100,7 +100,6 @@
}
}
- @Override
public void onMessage(ClientMessage clientMessage)
{
if (strategy.push(clientMessage) == false)
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/UriStrategy.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/UriStrategy.java 2010-10-11 01:04:43 UTC (rev 9765)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/UriStrategy.java 2010-10-11 01:08:38 UTC (rev 9766)
@@ -30,7 +30,6 @@
protected String method;
protected String contentType;
- @Override
public void setRegistration(PushRegistration reg)
{
this.registration = reg;
@@ -63,12 +62,10 @@
}
}
- @Override
public void stop()
{
}
- @Override
public boolean push(ClientMessage message)
{
String uri = createUri(message);
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java 2010-10-11 01:04:43 UTC (rev 9765)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java 2010-10-11 01:08:38 UTC (rev 9766)
@@ -18,7 +18,6 @@
super(dirname);
}
- @Override
public synchronized List<PushTopicRegistration> getByTopic(String topic)
{
List<PushTopicRegistration> list = new ArrayList<PushTopicRegistration>();
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java 2010-10-11 01:04:43 UTC (rev 9765)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java 2010-10-11 01:08:38 UTC (rev 9766)
@@ -84,7 +84,6 @@
private Object timeoutLock = new Object();
- @Override
public void testTimeout(String target)
{
synchronized (timeoutLock)
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/CustomHeaderLinkStrategy.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/CustomHeaderLinkStrategy.java 2010-10-11 01:04:43 UTC (rev 9765)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/CustomHeaderLinkStrategy.java 2010-10-11 01:08:38 UTC (rev 9766)
@@ -8,7 +8,6 @@
*/
public class CustomHeaderLinkStrategy implements LinkStrategy
{
- @Override
public void setLinkHeader(Response.ResponseBuilder builder, String title, String rel, String href, String type)
{
String headerName = null;
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/TimeoutTask.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/TimeoutTask.java 2010-10-11 01:04:43 UTC (rev 9765)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/TimeoutTask.java 2010-10-11 01:08:38 UTC (rev 9766)
@@ -58,7 +58,6 @@
thread.start();
}
- @Override
public void run()
{
while (running)
Modified: trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/JMSTest.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/JMSTest.java 2010-10-11 01:04:43 UTC (rev 9765)
+++ trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/JMSTest.java 2010-10-11 01:08:38 UTC (rev 9766)
@@ -2,6 +2,7 @@
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import org.hornetq.rest.HttpHeaderProperty;
import org.hornetq.rest.Jms;
import org.hornetq.rest.queue.QueueDeployment;
@@ -39,7 +40,7 @@
@BeforeClass
public static void setup() throws Exception
{
- connectionFactory = new HornetQConnectionFactory(manager.getQueueManager().getSessionFactory());
+ connectionFactory = new HornetQJMSConnectionFactory(manager.getQueueManager().getSessionFactory());
}
@XmlRootElement
@@ -128,7 +129,6 @@
public static Order order;
public static CountDownLatch latch = new CountDownLatch(1);
- @Override
public void onMessage(Message message)
{
try
Modified: trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/RawRestartTest.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/RawRestartTest.java 2010-10-11 01:04:43 UTC (rev 9765)
+++ trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/RawRestartTest.java 2010-10-11 01:08:38 UTC (rev 9766)
@@ -80,7 +80,6 @@
private static class MyListener implements MessageHandler
{
- @Override
public void onMessage(ClientMessage message)
{
int size = message.getBodyBuffer().readInt();
Modified: trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/TransformTest.java
===================================================================
--- trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/TransformTest.java 2010-10-11 01:04:43 UTC (rev 9765)
+++ trunk/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/TransformTest.java 2010-10-11 01:08:38 UTC (rev 9766)
@@ -180,7 +180,6 @@
public static Order order;
public static CountDownLatch latch = new CountDownLatch(1);
- @Override
public void onMessage(ClientMessage clientMessage)
{
System.out.println("onMessage!");
13 years, 6 months
JBoss hornetq SVN: r9765 - in trunk: docs/user-manual/zh and 12 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-10-10 21:04:43 -0400 (Sun, 10 Oct 2010)
New Revision: 9765
Added:
trunk/src/main/org/hornetq/jms/client/HornetQJMSConnectionFactory.java
trunk/src/main/org/hornetq/jms/client/HornetQQueueConnectionFactory.java
trunk/src/main/org/hornetq/jms/client/HornetQTopicConnectionFactory.java
trunk/src/main/org/hornetq/jms/client/HornetQXAConnectionFactory.java
trunk/src/main/org/hornetq/jms/client/HornetQXAQueueConnectionFactory.java
trunk/src/main/org/hornetq/jms/client/HornetQXATopicConnectionFactory.java
trunk/src/main/org/hornetq/jms/server/impl/JMSFactoryType.java
Modified:
trunk/docs/user-manual/en/configuration-index.xml
trunk/docs/user-manual/en/using-jms.xml
trunk/docs/user-manual/zh/configuration-index.xml
trunk/docs/user-manual/zh/using-jms.xml
trunk/src/config/common/schema/hornetq-jms.xsd
trunk/src/main/org/hornetq/api/core/client/HornetQClient.java
trunk/src/main/org/hornetq/api/jms/HornetQJMSClient.java
trunk/src/main/org/hornetq/jms/bridge/ConnectionFactoryFactory.java
trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
trunk/src/main/org/hornetq/jms/bridge/impl/JNDIConnectionFactoryFactory.java
trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java
Log:
HORNETQ-515 -- code and doc
Modified: trunk/docs/user-manual/en/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/en/configuration-index.xml 2010-10-11 01:01:04 UTC (rev 9764)
+++ trunk/docs/user-manual/en/configuration-index.xml 2010-10-11 01:04:43 UTC (rev 9765)
@@ -1018,6 +1018,22 @@
<colspec colname="c4" colnum="4"/>
<tbody>
<row>
+ <entry id="configuration.connection-factory.signature">
+ <link linkend="using-jms.configure.factory.types">connection-factory.signature (attribute)</link>
+ </entry>
+ <entry>String</entry>
+ <entry>Type of connection factory</entry>
+ <entry>generic</entry>
+ </row>
+ <row>
+ <entry id="configuration.connection-factory.signature">
+ <link linkend="using-jms.configure.factory.types">connection-factory.xa</link>
+ </entry>
+ <entry>Boolean</entry>
+ <entry>If it is a XA connection factory</entry>
+ <entry>false</entry>
+ </row>
+ <row>
<entry id="configuration.connection-factory.auto-group">
<link linkend="message-grouping.jmsconfigure">connection-factory.auto-group</link>
</entry>
Modified: trunk/docs/user-manual/en/using-jms.xml
===================================================================
--- trunk/docs/user-manual/en/using-jms.xml 2010-10-11 01:01:04 UTC (rev 9764)
+++ trunk/docs/user-manual/en/using-jms.xml 2010-10-11 01:04:43 UTC (rev 9765)
@@ -87,6 +87,82 @@
defines the transport and parameters used to actually connect to the server.</para>
</note>
</section>
+ <section id="using-jms.configure.factory.types">
+ <title>Connection Factory Types</title>
+ <para>The JMS API doc provides several connection factories for applications. HornetQ JMS users
+ can choose to configure the types for their connection factories. Each connection factory
+ has a <literal>signature</literal> attribute and a <literal>xa</literal> parameter, the
+ combination of which determines the type of the factory. Attribute <literal>signature</literal>
+ has three possible string values, i.e. <emphasis>generic</emphasis>,
+ <emphasis>queue</emphasis> and <emphasis>topic</emphasis>; <literal>xa</literal> is a boolean
+ type parameter. The following table gives their configuration values for different
+ connection factory interfaces.</para>
+ <table frame="topbot" id="using-jms.table.configure.factory.types">
+ <title>Configuration for Connection Factory Types</title>
+ <tgroup cols="3">
+ <colspec colname="signature" colnum="1"/>
+ <colspec colname="xa" colnum="2"/>
+ <colspec colname="cftype" colnum="3"/>
+ <thead>
+ <row>
+ <entry>signature</entry>
+ <entry>xa</entry>
+ <entry>Connection Factory Type</entry>
+ </row>
+ </thead>
+ <tbody>
+ <row>
+ <entry>generic (default)</entry>
+ <entry>false (default)</entry>
+ <entry>javax.jms.ConnectionFactory</entry>
+ </row>
+ <row>
+ <entry>generic</entry>
+ <entry>true</entry>
+ <entry>javax.jms.XAConnectionFactory</entry>
+ </row>
+ <row>
+ <entry>queue</entry>
+ <entry>false</entry>
+ <entry>javax.jms.QueueConnectionFactory</entry>
+ </row>
+ <row>
+ <entry>queue</entry>
+ <entry>true</entry>
+ <entry>javax.jms.XAQueueConnectionFactory</entry>
+ </row>
+ <row>
+ <entry>topic</entry>
+ <entry>false</entry>
+ <entry>javax.jms.TopicConnectionFactory</entry>
+ </row>
+ <row>
+ <entry>topic</entry>
+ <entry>true</entry>
+ <entry>javax.jms.XATopicConnectionFactory</entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+ <para>As an example, the following configures an XAQueueConnectionFactory:</para>
+ <programlisting>
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq ../schemas/hornetq-jms.xsd ">
+
+ <connection-factory name="ConnectionFactory" signature="queue">
+ <xa>true</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+</configuration>
+ </programlisting>
+
+ </section>
<section>
<title>JNDI configuration</title>
<para>When using JNDI from the client side you need to specify a set of JNDI properties
Modified: trunk/docs/user-manual/zh/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/zh/configuration-index.xml 2010-10-11 01:01:04 UTC (rev 9764)
+++ trunk/docs/user-manual/zh/configuration-index.xml 2010-10-11 01:04:43 UTC (rev 9765)
@@ -952,6 +952,22 @@
<entry/>
</row>
<row>
+ <entry id="configuration.connection-factory.signature">
+ <link linkend="using-jms.configure.factory.types">connection-factory.signature (属性)</link>
+ </entry>
+ <entry>String</entry>
+ <entry>连接工厂的类型</entry>
+ <entry>generic</entry>
+ </row>
+ <row>
+ <entry id="configuration.connection-factory.signature">
+ <link linkend="using-jms.configure.factory.types">connection-factory.xa</link>
+ </entry>
+ <entry>Boolean</entry>
+ <entry>是否是XA类型的连接工厂</entry>
+ <entry>false</entry>
+ </row>
+ <row>
<entry id="configuration.connection-factory.auto-group">
<link linkend="message-grouping.jmsconfigure">connection-factory.auto-group</link>
</entry>
Modified: trunk/docs/user-manual/zh/using-jms.xml
===================================================================
--- trunk/docs/user-manual/zh/using-jms.xml 2010-10-11 01:01:04 UTC (rev 9764)
+++ trunk/docs/user-manual/zh/using-jms.xml 2010-10-11 01:04:43 UTC (rev 9765)
@@ -72,6 +72,77 @@
<literal>hornetq-configuration.xml</literal> 中。它定义了采用何种传输与服务器连接。</para>
</note>
</section>
+ <section id="using-jms.configure.factory.types">
+ <title>连接工厂的类型</title>
+ <para>在JMS API文档中有几种不同类型的连接工厂供用户使用。HornetQ为用户提供了配置连接工厂类型的参数。用户可以通过
+ 配置连接工厂的”signature"属性和"xa"参数来得到想要的类型。“singature"属性是字符串类型,它有三个可选值:
+ <emphasis>generic</emphasis>、<emphasis>queue</emphasis>和<emphasis>topic</emphasis>;
+ <literal>xa</literal>是一个布尔型参数。下表给出了不同类型连接工厂对应的配置值:</para>
+ <table frame="topbot" id="using-jms.table.configure.factory.types">
+ <title>连接工厂类型的配置</title>
+ <tgroup cols="3">
+ <colspec colname="signature" colnum="1"/>
+ <colspec colname="xa" colnum="2"/>
+ <colspec colname="cftype" colnum="3"/>
+ <thead>
+ <row>
+ <entry>signature</entry>
+ <entry>xa</entry>
+ <entry>连接工厂的类型</entry>
+ </row>
+ </thead>
+ <tbody>
+ <row>
+ <entry>generic (默认)</entry>
+ <entry>false (默认)</entry>
+ <entry>javax.jms.ConnectionFactory</entry>
+ </row>
+ <row>
+ <entry>generic</entry>
+ <entry>true</entry>
+ <entry>javax.jms.XAConnectionFactory</entry>
+ </row>
+ <row>
+ <entry>queue</entry>
+ <entry>false</entry>
+ <entry>javax.jms.QueueConnectionFactory</entry>
+ </row>
+ <row>
+ <entry>queue</entry>
+ <entry>true</entry>
+ <entry>javax.jms.XAQueueConnectionFactory</entry>
+ </row>
+ <row>
+ <entry>topic</entry>
+ <entry>false</entry>
+ <entry>javax.jms.TopicConnectionFactory</entry>
+ </row>
+ <row>
+ <entry>topic</entry>
+ <entry>true</entry>
+ <entry>javax.jms.XATopicConnectionFactory</entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+ <para>下面的例子配置了一个XAQueueConnectionFactory:</para>
+ <programlisting>
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq ../schemas/hornetq-jms.xsd ">
+
+ <connection-factory name="ConnectionFactory" signature="queue">
+ <xa>true</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+</configuration>
+ </programlisting>
+ </section>
<section>
<title>JNDI的配置</title>
<para>当客户端使用JNDI时需要定义一些JNDI的参数。这些参数主要用来确定JNDI服务的地址。这些参数通常保存在
Modified: trunk/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-jms.xsd 2010-10-11 01:01:04 UTC (rev 9764)
+++ trunk/src/config/common/schema/hornetq-jms.xsd 2010-10-11 01:04:43 UTC (rev 9765)
@@ -26,6 +26,7 @@
<xsd:element name="connection-factory">
<xsd:complexType>
<xsd:all>
+ <xsd:element name="xa" type="xsd:boolean" maxOccurs="1" minOccurs="0"></xsd:element>
<xsd:element name="discovery-group-ref" type="discovery-group-refType" maxOccurs="1" minOccurs="0"></xsd:element>
<xsd:element name="discovery-initial-wait-timeout" type="xsd:long" maxOccurs="1" minOccurs="0"></xsd:element>
@@ -135,6 +136,7 @@
</xsd:element>
</xsd:all>
<xsd:attribute name="name" type="xsd:string"></xsd:attribute>
+ <xsd:attribute name="signature" type="xsd:string"></xsd:attribute>
</xsd:complexType>
</xsd:element>
Modified: trunk/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-10-11 01:01:04 UTC (rev 9764)
+++ trunk/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -91,6 +91,8 @@
public static final boolean DEFAULT_CACHE_LARGE_MESSAGE_CLIENT = false;
public static final int DEFAULT_INITIAL_MESSAGE_PACKET_SIZE = 1500;
+
+ public static final boolean DEFAULT_XA = false;
/**
* Creates a ClientSessionFactory using all the defaults.
Modified: trunk/src/main/org/hornetq/api/jms/HornetQJMSClient.java
===================================================================
--- trunk/src/main/org/hornetq/api/jms/HornetQJMSClient.java 2010-10-11 01:01:04 UTC (rev 9764)
+++ trunk/src/main/org/hornetq/api/jms/HornetQJMSClient.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -23,6 +23,13 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
+import org.hornetq.jms.client.HornetQQueueConnectionFactory;
+import org.hornetq.jms.client.HornetQTopicConnectionFactory;
+import org.hornetq.jms.client.HornetQXAConnectionFactory;
+import org.hornetq.jms.client.HornetQXAQueueConnectionFactory;
+import org.hornetq.jms.client.HornetQXATopicConnectionFactory;
+import org.hornetq.jms.server.impl.JMSFactoryType;
/**
* A utility class for creating HornetQ client-side JMS managed resources.
@@ -49,9 +56,9 @@
* @param sessionFactory The underlying ClientSessionFactory to use.
* @return The HornetQConnectionFactory.
*/
- public static HornetQConnectionFactory createConnectionFactory(final ClientSessionFactory sessionFactory)
+ public static HornetQJMSConnectionFactory createConnectionFactory(final ClientSessionFactory sessionFactory)
{
- return new HornetQConnectionFactory(sessionFactory);
+ return new HornetQJMSConnectionFactory(sessionFactory);
}
/**
@@ -59,11 +66,38 @@
*
* @param discoveryAddress The address to use for discovery.
* @param discoveryPort The port to use for discovery.
+ * @param jmsFactoryType
* @return The HornetQConnectionFactory.
*/
- public static HornetQConnectionFactory createConnectionFactory(final String discoveryAddress, final int discoveryPort)
+ public static HornetQConnectionFactory createConnectionFactory(final String discoveryAddress, final int discoveryPort, JMSFactoryType jmsFactoryType)
{
- return new HornetQConnectionFactory(discoveryAddress, discoveryPort);
+ HornetQConnectionFactory factory = null;
+ if (jmsFactoryType.equals(JMSFactoryType.CF))
+ {
+ factory = new HornetQJMSConnectionFactory(discoveryAddress, discoveryPort);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_CF))
+ {
+ factory = new HornetQQueueConnectionFactory(discoveryAddress, discoveryPort);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_CF))
+ {
+ factory = new HornetQTopicConnectionFactory(discoveryAddress, discoveryPort);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.XA_CF))
+ {
+ factory = new HornetQXAConnectionFactory(discoveryAddress, discoveryPort);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_XA_CF))
+ {
+ factory = new HornetQXAQueueConnectionFactory(discoveryAddress, discoveryPort);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_XA_CF))
+ {
+ factory = new HornetQXATopicConnectionFactory(discoveryAddress, discoveryPort);
+ }
+
+ return factory;
}
/**
@@ -72,22 +106,75 @@
* @param staticConnectors The list of TransportConfiguration to use.
* @return The HornetQConnectionFactory.
*/
- public static HornetQConnectionFactory createConnectionFactory(final List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors)
+ public static HornetQConnectionFactory createConnectionFactory(final List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors,
+ final JMSFactoryType jmsFactoryType)
{
- return new HornetQConnectionFactory(staticConnectors);
+ HornetQConnectionFactory factory = null;
+ if (jmsFactoryType.equals(JMSFactoryType.CF))
+ {
+ factory = new HornetQJMSConnectionFactory(staticConnectors);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_CF))
+ {
+ factory = new HornetQQueueConnectionFactory(staticConnectors);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_CF))
+ {
+ factory = new HornetQTopicConnectionFactory(staticConnectors);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.XA_CF))
+ {
+ factory = new HornetQXAConnectionFactory(staticConnectors);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_XA_CF))
+ {
+ factory = new HornetQXAQueueConnectionFactory(staticConnectors);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_XA_CF))
+ {
+ factory = new HornetQXATopicConnectionFactory(staticConnectors);
+ }
+
+ return factory;
}
/**
* Creates a HornetQConnectionFactory using a single pair of live-backup TransportConfiguration.
*
- * @param connectorConfig The TransportConfiguration of the server to connect to.
- * @param backupConnectorConfig The TransportConfiguration of the backup server to connect to. can be null.
+ * @param connectorConfigs The TransportConfiguration of the server to connect to.
* @return The HornetQConnectionFactory.
*/
public static HornetQConnectionFactory createConnectionFactory(final TransportConfiguration connectorConfig,
- final TransportConfiguration backupConnectorConfig)
+ final TransportConfiguration backupConnectorConfig,
+ final JMSFactoryType jmsFactoryType)
{
- return new HornetQConnectionFactory(connectorConfig, backupConnectorConfig);
+ HornetQConnectionFactory factory = null;
+ if (jmsFactoryType.equals(JMSFactoryType.CF))
+ {
+ factory = new HornetQJMSConnectionFactory(connectorConfig, backupConnectorConfig);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_CF))
+ {
+ factory = new HornetQQueueConnectionFactory(connectorConfig, backupConnectorConfig);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_CF))
+ {
+ factory = new HornetQTopicConnectionFactory(connectorConfig, backupConnectorConfig);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.XA_CF))
+ {
+ factory = new HornetQXAConnectionFactory(connectorConfig, backupConnectorConfig);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_XA_CF))
+ {
+ factory = new HornetQXAQueueConnectionFactory(connectorConfig, backupConnectorConfig);
+ }
+ else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_XA_CF))
+ {
+ factory = new HornetQXATopicConnectionFactory(connectorConfig, backupConnectorConfig);
+ }
+
+ return factory;
}
/**
@@ -96,11 +183,16 @@
* @param connectorConfig The TransportConfiguration of the server.
* @return The HornetQConnectionFactory.
*/
- public static HornetQConnectionFactory createConnectionFactory(final TransportConfiguration connectorConfig)
+ public static HornetQJMSConnectionFactory createConnectionFactory(final TransportConfiguration connectorConfig)
{
- return new HornetQConnectionFactory(connectorConfig);
+ return new HornetQJMSConnectionFactory(connectorConfig);
}
+ public static HornetQXAConnectionFactory createXAConnectionFactory(final TransportConfiguration connectorConfig)
+ {
+ return new HornetQXAConnectionFactory(connectorConfig);
+ }
+
/**
* Creates a client-side representation of a JMS Topic.
*
Modified: trunk/src/main/org/hornetq/jms/bridge/ConnectionFactoryFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/ConnectionFactoryFactory.java 2010-10-11 01:01:04 UTC (rev 9764)
+++ trunk/src/main/org/hornetq/jms/bridge/ConnectionFactoryFactory.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -13,8 +13,6 @@
package org.hornetq.jms.bridge;
-import javax.jms.ConnectionFactory;
-
/**
* A ConnectionFactoryFactory
*
@@ -26,5 +24,5 @@
*/
public interface ConnectionFactoryFactory
{
- ConnectionFactory createConnectionFactory() throws Exception;
+ Object createConnectionFactory() throws Exception;
}
Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-10-11 01:01:04 UTC (rev 9764)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -969,7 +969,7 @@
{
Connection conn;
- ConnectionFactory cf = cff.createConnectionFactory();
+ Object cf = cff.createConnectionFactory();
if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE && !(cf instanceof XAConnectionFactory))
{
@@ -992,7 +992,7 @@
{
JMSBridgeImpl.log.trace("Creating a non XA connection");
}
- conn = cf.createConnection();
+ conn = ((ConnectionFactory)cf).createConnection();
}
}
else
@@ -1011,7 +1011,7 @@
{
JMSBridgeImpl.log.trace("Creating a non XA connection");
}
- conn = cf.createConnection(username, password);
+ conn = ((ConnectionFactory)cf).createConnection(username, password);
}
}
Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JNDIConnectionFactoryFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JNDIConnectionFactoryFactory.java 2010-10-11 01:01:04 UTC (rev 9764)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JNDIConnectionFactoryFactory.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -15,8 +15,6 @@
import java.util.Hashtable;
-import javax.jms.ConnectionFactory;
-
import org.hornetq.jms.bridge.ConnectionFactoryFactory;
@@ -36,9 +34,9 @@
super(jndiProperties, lookup);
}
- public ConnectionFactory createConnectionFactory() throws Exception
+ public Object createConnectionFactory() throws Exception
{
- return (ConnectionFactory)createObject();
+ return createObject();
}
}
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-10-11 01:01:04 UTC (rev 9764)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -17,18 +17,12 @@
import java.util.List;
import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
import javax.jms.XAConnection;
-import javax.jms.XAConnectionFactory;
import javax.jms.XAQueueConnection;
-import javax.jms.XAQueueConnectionFactory;
import javax.jms.XATopicConnection;
-import javax.jms.XATopicConnectionFactory;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
@@ -48,8 +42,7 @@
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt> $Id$
*/
-public class HornetQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory,
- XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory, Serializable, Referenceable
+public class HornetQConnectionFactory implements Serializable, Referenceable
{
// Constants ------------------------------------------------------------------------------------
Added: trunk/src/main/org/hornetq/jms/client/HornetQJMSConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQJMSConnectionFactory.java (rev 0)
+++ trunk/src/main/org/hornetq/jms/client/HornetQJMSConnectionFactory.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.client;
+
+import java.util.List;
+
+import javax.jms.ConnectionFactory;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+
+
+/**
+ * A class that represents a ConnectionFactory.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public class HornetQJMSConnectionFactory extends HornetQConnectionFactory implements ConnectionFactory
+{
+
+ private final static long serialVersionUID = -2810634789345348326L;
+
+ public HornetQJMSConnectionFactory(TransportConfiguration transportConfiguration)
+ {
+ super(transportConfiguration);
+ }
+
+ public HornetQJMSConnectionFactory(ClientSessionFactory sessionFactory)
+ {
+ super(sessionFactory);
+ }
+
+ public HornetQJMSConnectionFactory(String discoveryAddress, int discoveryPort)
+ {
+ super(discoveryAddress, discoveryPort);
+ }
+
+ public HornetQJMSConnectionFactory(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs)
+ {
+ super(connectorConfigs);
+ }
+
+ public HornetQJMSConnectionFactory(TransportConfiguration connectorConfig,
+ TransportConfiguration backupConnectorConfig)
+ {
+ super(connectorConfig, backupConnectorConfig);
+ }
+}
Added: trunk/src/main/org/hornetq/jms/client/HornetQQueueConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQQueueConnectionFactory.java (rev 0)
+++ trunk/src/main/org/hornetq/jms/client/HornetQQueueConnectionFactory.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.client;
+
+import java.util.List;
+
+import javax.jms.QueueConnectionFactory;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+
+/**
+ * A class that represents a QueueConnectionFactory.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class HornetQQueueConnectionFactory extends HornetQConnectionFactory implements QueueConnectionFactory
+{
+ private static final long serialVersionUID = 5312455021322463546L;
+
+ public HornetQQueueConnectionFactory(String discoveryAddress, int discoveryPort)
+ {
+ super(discoveryAddress, discoveryPort);
+ }
+
+ public HornetQQueueConnectionFactory(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs)
+ {
+ super(connectorConfigs);
+ }
+
+ public HornetQQueueConnectionFactory(TransportConfiguration connectorConfig,
+ TransportConfiguration backupConnectorConfig)
+ {
+ super(connectorConfig, backupConnectorConfig);
+ }
+}
Added: trunk/src/main/org/hornetq/jms/client/HornetQTopicConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQTopicConnectionFactory.java (rev 0)
+++ trunk/src/main/org/hornetq/jms/client/HornetQTopicConnectionFactory.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.client;
+
+import java.util.List;
+
+import javax.jms.TopicConnectionFactory;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+
+/**
+ * A class that represents a TopicConnectionFactory.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class HornetQTopicConnectionFactory extends HornetQConnectionFactory implements TopicConnectionFactory
+{
+ private static final long serialVersionUID = 7317051989866548455L;
+
+ public HornetQTopicConnectionFactory(String discoveryAddress, int discoveryPort)
+ {
+ super(discoveryAddress, discoveryPort);
+ }
+
+ public HornetQTopicConnectionFactory(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs)
+ {
+ super(connectorConfigs);
+ }
+
+ public HornetQTopicConnectionFactory(TransportConfiguration connectorConfig,
+ TransportConfiguration backupConnectorConfig)
+ {
+ super(connectorConfig, backupConnectorConfig);
+ }
+}
Added: trunk/src/main/org/hornetq/jms/client/HornetQXAConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQXAConnectionFactory.java (rev 0)
+++ trunk/src/main/org/hornetq/jms/client/HornetQXAConnectionFactory.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.client;
+
+import java.util.List;
+
+import javax.jms.XAConnectionFactory;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+
+/**
+ * A class that represents a XAConnectionFactory.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public class HornetQXAConnectionFactory extends HornetQConnectionFactory implements XAConnectionFactory
+{
+ private static final long serialVersionUID = 743611571839154115L;
+
+ public HornetQXAConnectionFactory(String discoveryAddress, int discoveryPort)
+ {
+ super(discoveryAddress, discoveryPort);
+ }
+
+ public HornetQXAConnectionFactory(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs)
+ {
+ super(connectorConfigs);
+ }
+
+ public HornetQXAConnectionFactory(TransportConfiguration connectorConfig,
+ TransportConfiguration backupConnectorConfig)
+ {
+ super(connectorConfig, backupConnectorConfig);
+ }
+
+ public HornetQXAConnectionFactory(TransportConfiguration connectorConfig)
+ {
+ super(connectorConfig);
+ }
+}
Added: trunk/src/main/org/hornetq/jms/client/HornetQXAQueueConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQXAQueueConnectionFactory.java (rev 0)
+++ trunk/src/main/org/hornetq/jms/client/HornetQXAQueueConnectionFactory.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.client;
+
+import java.util.List;
+
+import javax.jms.XAQueueConnectionFactory;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+
+/**
+ * A class that represents a XAQueueConnectionFactory.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class HornetQXAQueueConnectionFactory extends HornetQConnectionFactory implements XAQueueConnectionFactory
+{
+ private static final long serialVersionUID = 8612457847251087454L;
+
+ public HornetQXAQueueConnectionFactory(String discoveryAddress, int discoveryPort)
+ {
+ super(discoveryAddress, discoveryPort);
+ }
+
+ public HornetQXAQueueConnectionFactory(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs)
+ {
+ super(connectorConfigs);
+ }
+
+ public HornetQXAQueueConnectionFactory(TransportConfiguration connectorConfig,
+ TransportConfiguration backupConnectorConfig)
+ {
+ super(connectorConfig, backupConnectorConfig);
+ }
+}
Added: trunk/src/main/org/hornetq/jms/client/HornetQXATopicConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQXATopicConnectionFactory.java (rev 0)
+++ trunk/src/main/org/hornetq/jms/client/HornetQXATopicConnectionFactory.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.client;
+
+import java.util.List;
+
+import javax.jms.XATopicConnectionFactory;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+
+/**
+ * A class that represents a XATopicConnectionFactory.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class HornetQXATopicConnectionFactory extends HornetQConnectionFactory implements XATopicConnectionFactory
+{
+ private static final long serialVersionUID = -7018290426884419693L;
+
+ public HornetQXATopicConnectionFactory(String discoveryAddress, int discoveryPort)
+ {
+ super(discoveryAddress, discoveryPort);
+ }
+
+ public HornetQXATopicConnectionFactory(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs)
+ {
+ super(connectorConfigs);
+ }
+
+ public HornetQXATopicConnectionFactory(TransportConfiguration connectorConfig,
+ TransportConfiguration backupConnectorConfig)
+ {
+ super(connectorConfig, backupConnectorConfig);
+ }
+}
Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-10-11 01:01:04 UTC (rev 9764)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -38,6 +38,7 @@
import org.hornetq.api.jms.management.TopicControl;
import org.hornetq.core.management.impl.MBeanInfoHelper;
import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.impl.JMSFactoryType;
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
@@ -163,7 +164,7 @@
{
TransportConfiguration liveTC = new TransportConfiguration(liveTransportClassName, liveTransportParams);
- server.createConnectionFactory(name, liveTC, JMSServerControlImpl.convert(jndiBindings));
+ server.createConnectionFactory(name, liveTC, JMSFactoryType.CF, JMSServerControlImpl.convert(jndiBindings));
sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
}
@@ -173,6 +174,121 @@
}
}
+ public void createXAConnectionFactory(final String name,
+ final String liveTransportClassName,
+ final Map<String, Object> liveTransportParams,
+ final Object[] jndiBindings) throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ TransportConfiguration liveTC = new TransportConfiguration(liveTransportClassName, liveTransportParams);
+
+ server.createConnectionFactory(name, liveTC, JMSFactoryType.XA_CF, JMSServerControlImpl.convert(jndiBindings));
+
+ sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
+ public void createQueueConnectionFactory(final String name,
+ final String liveTransportClassName,
+ final Map<String, Object> liveTransportParams,
+ final Object[] jndiBindings) throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ TransportConfiguration liveTC = new TransportConfiguration(liveTransportClassName, liveTransportParams);
+
+ server.createConnectionFactory(name, liveTC, JMSFactoryType.QUEUE_CF, JMSServerControlImpl.convert(jndiBindings));
+
+ sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
+ public void createTopicConnectionFactory(final String name,
+ final String liveTransportClassName,
+ final Map<String, Object> liveTransportParams,
+ final Object[] jndiBindings) throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ TransportConfiguration liveTC = new TransportConfiguration(liveTransportClassName, liveTransportParams);
+
+ server.createConnectionFactory(name, liveTC, JMSFactoryType.TOPIC_CF, JMSServerControlImpl.convert(jndiBindings));
+
+ sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
+ public void createXAQueueConnectionFactory(final String name,
+ final String liveTransportClassName,
+ final Map<String, Object> liveTransportParams,
+ final Object[] jndiBindings) throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ TransportConfiguration liveTC = new TransportConfiguration(liveTransportClassName, liveTransportParams);
+
+ server.createConnectionFactory(name, liveTC, JMSFactoryType.QUEUE_XA_CF, JMSServerControlImpl.convert(jndiBindings));
+
+ sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
+ public void createXATopicConnectionFactory(final String name,
+ final String liveTransportClassName,
+ final Map<String, Object> liveTransportParams,
+ final Object[] jndiBindings) throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ TransportConfiguration liveTC = new TransportConfiguration(liveTransportClassName, liveTransportParams);
+
+ server.createConnectionFactory(name, liveTC, JMSFactoryType.TOPIC_XA_CF, JMSServerControlImpl.convert(jndiBindings));
+
+ sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
public void createConnectionFactory(final String name,
final Object[] liveConnectorsTransportClassNames,
final Object[] liveConnectorTransportParams,
Modified: trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-10-11 01:01:04 UTC (rev 9764)
+++ trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -25,6 +25,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.spi.core.naming.BindingRegistry;
/**
@@ -170,7 +171,7 @@
TransportConfiguration backupTC,
String ... bindings) throws Exception;
- void createConnectionFactory(String name, TransportConfiguration liveTC, String ... bindings) throws Exception;
+ void createConnectionFactory(String name, TransportConfiguration liveTC, JMSFactoryType cfType, String ... bindings) throws Exception;
void createConnectionFactory(String name,
String clientID,
@@ -222,6 +223,7 @@
boolean failoverOnInitialConnection,
boolean failoverOnServerShutdown,
String groupId,
+ JMSFactoryType factoryType,
String ... bindings) throws Exception;
void createConnectionFactory(String name,
Modified: trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2010-10-11 01:01:04 UTC (rev 9764)
+++ trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -19,6 +19,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.impl.JMSFactoryType;
/**
* A ConnectionFactoryConfiguration
@@ -202,4 +203,8 @@
String getGroupID();
void setGroupID(String groupID);
+
+ void setFactoryType(JMSFactoryType factType);
+
+ JMSFactoryType getFactoryType();
}
Modified: trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-10-11 01:01:04 UTC (rev 9764)
+++ trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.utils.BufferHelper;
import org.hornetq.utils.DataConstants;
@@ -115,6 +116,8 @@
private String groupID = null;
+ private JMSFactoryType factoryType = JMSFactoryType.CF;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -671,6 +674,8 @@
failoverOnServerShutdown = buffer.readBoolean();
groupID = BufferHelper.readNullableSimpleStringAsString(buffer);
+
+ factoryType = JMSFactoryType.valueOf(buffer.readInt());
}
/* (non-Javadoc)
@@ -762,6 +767,8 @@
buffer.writeBoolean(failoverOnServerShutdown);
BufferHelper.writeAsNullableSimpleString(buffer, groupID);
+
+ buffer.writeInt(factoryType.intValue());
}
@@ -860,8 +867,20 @@
DataConstants.SIZE_BOOLEAN + // failoverOnServerShutdown
- BufferHelper.sizeOfNullableSimpleString(groupID);
+ BufferHelper.sizeOfNullableSimpleString(groupID) +
+
+ DataConstants.SIZE_INT; //factoryType
}
+
+ public void setFactoryType(JMSFactoryType factoryType)
+ {
+ this.factoryType = factoryType;
+ }
+
+ public JMSFactoryType getFactoryType()
+ {
+ return factoryType;
+ }
// Public --------------------------------------------------------
Added: trunk/src/main/org/hornetq/jms/server/impl/JMSFactoryType.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSFactoryType.java (rev 0)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSFactoryType.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.server.impl;
+
+/**
+ * A JMSFactoryType
+ *
+ * @author howard
+ *
+ *
+ */
+public enum JMSFactoryType
+{
+ CF, QUEUE_CF, TOPIC_CF, XA_CF, QUEUE_XA_CF, TOPIC_XA_CF;
+
+ public int intValue()
+ {
+ int val = 0;
+ switch (this)
+ {
+ case CF:
+ val = 0;
+ break;
+ case QUEUE_CF:
+ val = 1;
+ break;
+ case TOPIC_CF:
+ val = 2;
+ break;
+ case XA_CF:
+ val = 3;
+ break;
+ case QUEUE_XA_CF:
+ val = 4;
+ break;
+ case TOPIC_XA_CF:
+ val = 5;
+ break;
+ }
+ return val;
+ }
+
+ public static JMSFactoryType valueOf(int val)
+ {
+ JMSFactoryType type;
+ switch (val)
+ {
+ case 0:
+ type = CF;
+ break;
+ case 1:
+ type = QUEUE_CF;
+ break;
+ case 2:
+ type = TOPIC_CF;
+ break;
+ case 3:
+ type = XA_CF;
+ break;
+ case 4:
+ type = QUEUE_XA_CF;
+ break;
+ case 5:
+ type = TOPIC_XA_CF;
+ break;
+ default:
+ type = XA_CF;
+ break;
+ }
+ return type;
+ }
+}
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2010-10-11 01:01:04 UTC (rev 9764)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -205,7 +205,14 @@
Element e = (Element)node;
String name = node.getAttributes().getNamedItem(JMSServerConfigParserImpl.NAME_ATTR).getNodeValue();
+
+ String fact = e.getAttribute("signature");
+ boolean isXA = XMLConfigurationUtil.getBoolean(e,
+ "xa",
+ HornetQClient.DEFAULT_XA);
+ JMSFactoryType factType = resolveFactoryType(fact, isXA);
+
long clientFailureCheckPeriod = XMLConfigurationUtil.getLong(e,
"client-failure-check-period",
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
@@ -381,6 +388,7 @@
cfConfig.setConnectorNames(connectorNames);
}
+ cfConfig.setFactoryType(factType);
cfConfig.setClientID(clientID);
cfConfig.setClientFailureCheckPeriod(clientFailureCheckPeriod);
cfConfig.setConnectionTTL(connectionTTL);
@@ -413,6 +421,46 @@
return cfConfig;
}
+ private JMSFactoryType resolveFactoryType(String fact, boolean isXA) throws HornetQException
+ {
+ if ("".equals(fact))
+ {
+ fact = "generic";
+ }
+ if (isXA)
+ {
+ if ("generic".equals(fact))
+ {
+ return JMSFactoryType.XA_CF;
+ }
+ if ("queue".equals(fact))
+ {
+ return JMSFactoryType.QUEUE_XA_CF;
+ }
+ if ("topic".equals(fact))
+ {
+ return JMSFactoryType.TOPIC_XA_CF;
+ }
+ }
+ else
+ {
+ if ("generic".equals(fact))
+ {
+ return JMSFactoryType.CF;
+ }
+ if ("queue".equals(fact))
+ {
+ return JMSFactoryType.QUEUE_CF;
+ }
+ if ("topic".equals(fact))
+ {
+ return JMSFactoryType.TOPIC_CF;
+ }
+ }
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Invalid signature " + fact +
+ " at parseConnectionFactory");
+ }
+
/**
* hook for integration layers
* @param topicName
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-10-11 01:01:04 UTC (rev 9764)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -23,7 +23,6 @@
import javax.naming.Context;
import javax.naming.InitialContext;
-import javax.naming.NameNotFoundException;
import javax.naming.NamingException;
import org.hornetq.api.core.HornetQException;
@@ -754,6 +753,7 @@
final boolean failoverOnInitialConnection,
final boolean failoverOnServerShutdown,
final String groupId,
+ final JMSFactoryType factoryType,
String... jndiBindings) throws Exception
{
checkInitialised();
@@ -790,6 +790,7 @@
configuration.setFailoverOnInitialConnection(failoverOnInitialConnection);
configuration.setFailoverOnServerShutdown(failoverOnServerShutdown);
configuration.setGroupID(groupId);
+ configuration.setFactoryType(factoryType);
createConnectionFactory(true, configuration, jndiBindings);
}
}
@@ -969,13 +970,14 @@
final int reconnectAttempts,
final boolean failoverOnInitialConnection,
final boolean failoverOnServerShutdown,
- final String groupId) throws Exception
+ final String groupId,
+ final JMSFactoryType jmsFactoryType) throws Exception
{
checkInitialised();
HornetQConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
- cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactory(discoveryAddress, discoveryPort);
+ cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactory(discoveryAddress, discoveryPort, jmsFactoryType);
cf.setClientID(clientID);
cf.setLocalBindAddress(localBindAddress);
cf.setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
@@ -1043,13 +1045,14 @@
final int reconnectAttempts,
final boolean failoverOnInitialConnection,
final boolean failoverOnServerShutdown,
- final String groupId) throws Exception
+ final String groupId,
+ final JMSFactoryType jmsFactoryType) throws Exception
{
checkInitialised();
HornetQConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
- cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactory(connectorConfigs);
+ cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactory(connectorConfigs, jmsFactoryType);
cf.setClientID(clientID);
cf.setClientFailureCheckPeriod(clientFailureCheckPeriod);
cf.setConnectionTTL(connectionTTL);
@@ -1213,7 +1216,8 @@
cfConfig.getReconnectAttempts(),
cfConfig.isFailoverOnInitialConnection(),
cfConfig.isFailoverOnServerShutdown(),
- cfConfig.getGroupID());
+ cfConfig.getGroupID(),
+ cfConfig.getFactoryType());
}
else
{
@@ -1247,7 +1251,8 @@
cfConfig.getReconnectAttempts(),
cfConfig.isFailoverOnInitialConnection(),
cfConfig.isFailoverOnServerShutdown(),
- cfConfig.getGroupID());
+ cfConfig.getGroupID(),
+ cfConfig.getFactoryType());
}
connectionFactories.put(cfConfig.getName(), cf);
@@ -1258,6 +1263,7 @@
public synchronized void createConnectionFactory(final String name,
final TransportConfiguration liveTC,
+ final JMSFactoryType cfType,
final String... jndiBindings) throws Exception
{
checkInitialised();
@@ -1265,6 +1271,7 @@
if (cf == null)
{
ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name, liveTC);
+ configuration.setFactoryType(cfType);
createConnectionFactory(true, configuration, jndiBindings);
}
}
Modified: trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2010-10-11 01:01:04 UTC (rev 9764)
+++ trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2010-10-11 01:04:43 UTC (rev 9765)
@@ -35,6 +35,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.logging.Logger;
import org.hornetq.ra.inflow.HornetQActivation;
@@ -1389,13 +1390,13 @@
: new TransportConfiguration(backUpCOnnectorClassname,
backupConnectionParams);
- cf = HornetQJMSClient.createConnectionFactory(transportConf, backup);
+ cf = HornetQJMSClient.createConnectionFactory(transportConf, backup, JMSFactoryType.XA_CF);
}
else if (discoveryAddress != null)
{
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort()
: getDiscoveryPort();
- cf = HornetQJMSClient.createConnectionFactory(discoveryAddress, discoveryPort);
+ cf = HornetQJMSClient.createConnectionFactory(discoveryAddress, discoveryPort, JMSFactoryType.XA_CF);
}
else
{
13 years, 6 months
JBoss hornetq SVN: r9764 - in trunk/tests: jms-tests/src/org/hornetq/jms/tests and 12 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-10-10 21:01:04 -0400 (Sun, 10 Oct 2010)
New Revision: 9764
Modified:
trunk/tests/jms-tests/config/hornetq-jms.xml
trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionClosedTest.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionFactoryTest.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/HornetQServerTestCase.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/JMSXDeliveryCountTest.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
trunk/tests/joram-tests/src/org/hornetq/jms/HornetQAdmin.java
trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java
trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/AutoGroupingTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/GroupIDTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
trunk/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java
trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
HORNETQ-515 -- test changes
Modified: trunk/tests/jms-tests/config/hornetq-jms.xml
===================================================================
--- trunk/tests/jms-tests/config/hornetq-jms.xml 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/jms-tests/config/hornetq-jms.xml 2010-10-11 01:01:04 UTC (rev 9764)
@@ -15,4 +15,112 @@
</entries>
</connection-factory>
-</configuration>
\ No newline at end of file
+ <connection-factory name="JMSConnectionFactory1">
+ <xa>true</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_XA_TRUE"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory2">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_XA_FALSE"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory3" signature="generic">
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_GENERIC"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory4" signature="generic">
+ <xa>true</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_GENERIC_XA_TRUE"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory5" signature="generic">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_GENERIC_XA_FALSE"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory6" signature="queue">
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_QUEUE"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory7" signature="queue">
+ <xa>true</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_QUEUE_XA_TRUE"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory8" signature="queue">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_QUEUE_XA_FALSE"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory9" signature="topic">
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_TOPIC"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory10" signature="topic">
+ <xa>true</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_TOPIC_XA_TRUE"/>
+ </entries>
+ </connection-factory>
+
+ <connection-factory name="JMSConnectionFactory11" signature="topic">
+ <xa>false</xa>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
+ <entries>
+ <entry name="/CF_TOPIC_XA_FALSE"/>
+ </entries>
+ </connection-factory>
+
+</configuration>
+
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -25,6 +25,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.impl.JMSFactoryType;
/**
* Safeguards for previously detected TCK failures.
@@ -95,6 +96,7 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
null,
+ JMSFactoryType.CF,
"/StrictTCKConnectionFactory");
CTSMiscellaneousTest.cf = (HornetQConnectionFactory)getInitialContext().lookup("/StrictTCKConnectionFactory");
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionClosedTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionClosedTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionClosedTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -66,8 +66,8 @@
/** See TCK test: topicconntests.connNotStartedTopicTest */
public void testCannotReceiveMessageOnStoppedConnection() throws Exception
{
- TopicConnection conn1 = ((TopicConnectionFactory)JMSTestCase.cf).createTopicConnection();
- TopicConnection conn2 = ((TopicConnectionFactory)JMSTestCase.cf).createTopicConnection();
+ TopicConnection conn1 = ((TopicConnectionFactory)JMSTestCase.topicCf).createTopicConnection();
+ TopicConnection conn2 = ((TopicConnectionFactory)JMSTestCase.topicCf).createTopicConnection();
TopicSession sess1 = conn1.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSession sess2 = conn2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionFactoryTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionFactoryTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionFactoryTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -28,7 +28,11 @@
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XAQueueConnectionFactory;
+import javax.jms.XATopicConnectionFactory;
+import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.tests.util.ProxyAssertSupport;
/**
@@ -62,7 +66,7 @@
*/
public void testQueueConnectionFactory() throws Exception
{
- QueueConnectionFactory qcf = (QueueConnectionFactory)JMSTestCase.ic.lookup("/ConnectionFactory");
+ QueueConnectionFactory qcf = (QueueConnectionFactory)JMSTestCase.ic.lookup("/CF_QUEUE_XA_FALSE");
QueueConnection qc = qcf.createQueueConnection();
qc.close();
}
@@ -73,7 +77,7 @@
*/
public void testTopicConnectionFactory() throws Exception
{
- TopicConnectionFactory qcf = (TopicConnectionFactory)JMSTestCase.ic.lookup("/ConnectionFactory");
+ TopicConnectionFactory qcf = (TopicConnectionFactory)JMSTestCase.ic.lookup("/CF_TOPIC_XA_FALSE");
TopicConnection tc = qcf.createTopicConnection();
tc.close();
}
@@ -107,7 +111,7 @@
// the ConnectionFactories that ship with HornetQ do not have their clientID
// administratively configured.
- ConnectionFactory cf = (ConnectionFactory)JMSTestCase.ic.lookup("/ConnectionFactory");
+ ConnectionFactory cf = (ConnectionFactory)JMSTestCase.ic.lookup("/CF_XA_FALSE");
Connection c = cf.createConnection();
ProxyAssertSupport.assertNull(c.getClientID());
@@ -120,7 +124,7 @@
// the ConnectionFactories that ship with HornetQ do not have their clientID
// administratively configured.
- ConnectionFactory cf = (ConnectionFactory)JMSTestCase.ic.lookup("/ConnectionFactory");
+ ConnectionFactory cf = (ConnectionFactory)JMSTestCase.ic.lookup("/CF_XA_FALSE");
Connection c = cf.createConnection();
// set the client id immediately after the connection is created
@@ -320,7 +324,102 @@
}
}
+
+ public void testFactoryTypes() throws Exception
+ {
+ HornetQConnectionFactory factory = null;
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/ConnectionFactory");
+
+ assertTrue(factory instanceof ConnectionFactory);
+ assertEquals(1, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_XA_TRUE");
+
+ assertTrue(factory instanceof XAConnectionFactory);
+ assertEquals(1, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_XA_FALSE");
+
+ assertTrue(factory instanceof ConnectionFactory);
+ assertEquals(1, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_GENERIC");
+
+ assertTrue(factory instanceof ConnectionFactory);
+ assertEquals(1, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_GENERIC_XA_TRUE");
+
+ assertTrue(factory instanceof XAConnectionFactory);
+ assertEquals(1, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_GENERIC_XA_FALSE");
+
+ assertTrue(factory instanceof ConnectionFactory);
+ assertEquals(1, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_QUEUE");
+
+ assertTrue(factory instanceof QueueConnectionFactory);
+ assertEquals(2, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_QUEUE_XA_TRUE");
+
+ assertTrue(factory instanceof XAQueueConnectionFactory);
+ assertEquals(4, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_QUEUE_XA_FALSE");
+
+ assertTrue(factory instanceof QueueConnectionFactory);
+ assertEquals(2, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_TOPIC");
+
+ assertTrue(factory instanceof TopicConnectionFactory);
+ assertEquals(2, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_TOPIC_XA_TRUE");
+
+ assertTrue(factory instanceof XATopicConnectionFactory);
+ assertEquals(4, getTypes(factory));
+
+ factory = (HornetQConnectionFactory)JMSTestCase.ic.lookup("/CF_TOPIC_XA_FALSE");
+
+ assertTrue(factory instanceof TopicConnectionFactory);
+ assertEquals(2, getTypes(factory));
+ }
+ private int getTypes(HornetQConnectionFactory factory)
+ {
+ int num = 0;
+ if (factory instanceof ConnectionFactory)
+ {
+ num++;
+ }
+ if (factory instanceof XAConnectionFactory)
+ {
+ num++;
+ }
+ if (factory instanceof QueueConnectionFactory)
+ {
+ num++;
+ }
+ if (factory instanceof TopicConnectionFactory)
+ {
+ num++;
+ }
+ if (factory instanceof XAQueueConnectionFactory)
+ {
+ num++;
+ }
+ if (factory instanceof XATopicConnectionFactory)
+ {
+ num++;
+ }
+ return num;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -211,7 +211,7 @@
*/
public void testQueueConnection1() throws Exception
{
- QueueConnectionFactory qcf = JMSTestCase.cf;
+ QueueConnectionFactory qcf = JMSTestCase.queueCf;
QueueConnection qc = qcf.createQueueConnection();
@@ -225,7 +225,7 @@
*/
public void testQueueConnection2() throws Exception
{
- TopicConnectionFactory tcf = JMSTestCase.cf;
+ TopicConnectionFactory tcf = JMSTestCase.topicCf;
TopicConnection tc = tcf.createTopicConnection();
@@ -280,7 +280,7 @@
*/
public void testDurableSubscriberOnQueueConnection() throws Exception
{
- QueueConnection queueConnection = ((QueueConnectionFactory)JMSTestCase.cf).createQueueConnection();
+ QueueConnection queueConnection = ((QueueConnectionFactory)JMSTestCase.queueCf).createQueueConnection();
try
{
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/HornetQServerTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/HornetQServerTestCase.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/HornetQServerTestCase.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -318,12 +318,12 @@
public TopicConnectionFactory getTopicConnectionFactory() throws Exception
{
- return (TopicConnectionFactory)getInitialContext().lookup("/ConnectionFactory");
+ return (TopicConnectionFactory)getInitialContext().lookup("/CF_TOPIC");
}
public XAConnectionFactory getXAConnectionFactory() throws Exception
{
- return (XAConnectionFactory)getInitialContext().lookup("/ConnectionFactory");
+ return (XAConnectionFactory)getInitialContext().lookup("/CF_XA_TRUE");
}
public InitialContext getInitialContext(final int serverid) throws Exception
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -22,6 +22,10 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
+import org.hornetq.jms.client.HornetQQueueConnectionFactory;
+import org.hornetq.jms.client.HornetQTopicConnectionFactory;
+import org.hornetq.jms.server.impl.JMSFactoryType;
/**
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
@@ -32,8 +36,12 @@
public class JMSTestCase extends HornetQServerTestCase
{
- protected static HornetQConnectionFactory cf;
+ protected static HornetQJMSConnectionFactory cf;
+ protected static HornetQQueueConnectionFactory queueCf;
+
+ protected static HornetQTopicConnectionFactory topicCf;
+
protected static InitialContext ic;
protected static final String defaultConf = "all";
@@ -91,10 +99,81 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
null,
+ JMSFactoryType.CF,
"/testsuitecf");
- JMSTestCase.cf = (HornetQConnectionFactory)getInitialContext().lookup("/testsuitecf");
+ getJmsServerManager().createConnectionFactory("testsuitecf_queue",
+ connectorConfigs,
+ null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ HornetQClient.DEFAULT_CALL_TIMEOUT,
+ HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
+ HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
+ HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE,
+ HornetQClient.DEFAULT_PRODUCER_MAX_RATE,
+ true,
+ true,
+ true,
+ HornetQClient.DEFAULT_AUTO_GROUP,
+ HornetQClient.DEFAULT_PRE_ACKNOWLEDGE,
+ HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+ HornetQClient.DEFAULT_ACK_BATCH_SIZE,
+ HornetQClient.DEFAULT_ACK_BATCH_SIZE,
+ HornetQClient.DEFAULT_USE_GLOBAL_POOLS,
+ HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+ HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE,
+ HornetQClient.DEFAULT_RETRY_INTERVAL,
+ HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ HornetQClient.DEFAULT_RECONNECT_ATTEMPTS,
+ HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
+ HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
+ null,
+ JMSFactoryType.QUEUE_CF,
+ "/testsuitecf_queue");
+ getJmsServerManager().createConnectionFactory("testsuitecf_topic",
+ connectorConfigs,
+ null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ HornetQClient.DEFAULT_CALL_TIMEOUT,
+ HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
+ HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
+ HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE,
+ HornetQClient.DEFAULT_PRODUCER_MAX_RATE,
+ true,
+ true,
+ true,
+ HornetQClient.DEFAULT_AUTO_GROUP,
+ HornetQClient.DEFAULT_PRE_ACKNOWLEDGE,
+ HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+ HornetQClient.DEFAULT_ACK_BATCH_SIZE,
+ HornetQClient.DEFAULT_ACK_BATCH_SIZE,
+ HornetQClient.DEFAULT_USE_GLOBAL_POOLS,
+ HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+ HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE,
+ HornetQClient.DEFAULT_RETRY_INTERVAL,
+ HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ HornetQClient.DEFAULT_RECONNECT_ATTEMPTS,
+ HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
+ HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
+ null,
+ JMSFactoryType.TOPIC_CF,
+ "/testsuitecf_topic");
+
+ JMSTestCase.cf = (HornetQJMSConnectionFactory)getInitialContext().lookup("/testsuitecf");
+ JMSTestCase.queueCf = (HornetQQueueConnectionFactory)getInitialContext().lookup("/testsuitecf_queue");
+ JMSTestCase.topicCf = (HornetQTopicConnectionFactory)getInitialContext().lookup("/testsuitecf_topic");
+
assertRemainingMessages(0);
}
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -27,6 +27,7 @@
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.jms.client.HornetQTopic;
import org.hornetq.jms.referenceable.ConnectionFactoryObjectFactory;
@@ -88,7 +89,7 @@
ProxyAssertSupport.assertTrue(instance instanceof HornetQConnectionFactory);
- HornetQConnectionFactory cf2 = (HornetQConnectionFactory)instance;
+ HornetQJMSConnectionFactory cf2 = (HornetQJMSConnectionFactory)instance;
simpleSendReceive(cf2, HornetQServerTestCase.queue1);
}
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/JMSXDeliveryCountTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/JMSXDeliveryCountTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/JMSXDeliveryCountTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -499,7 +499,7 @@
producer.send(tm);
- xaConn = ((XAConnectionFactory)getConnectionFactory()).createXAConnection();
+ xaConn = ((XAConnectionFactory)getXAConnectionFactory()).createXAConnection();
XASession consumerSess = xaConn.createXASession();
MessageConsumer consumer = consumerSess.createConsumer(HornetQServerTestCase.queue1);
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -38,6 +38,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.integration.bootstrap.HornetQBootstrapServer;
import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.jboss.kernel.plugins.config.property.PropertyKernelConfig;
/**
@@ -321,6 +322,7 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
null,
+ JMSFactoryType.CF,
jndiBindings);
}
Modified: trunk/tests/joram-tests/src/org/hornetq/jms/HornetQAdmin.java
===================================================================
--- trunk/tests/joram-tests/src/org/hornetq/jms/HornetQAdmin.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/joram-tests/src/org/hornetq/jms/HornetQAdmin.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -129,7 +129,19 @@
public void createQueueConnectionFactory(final String name)
{
- createConnectionFactory(name);
+ try
+ {
+ invokeSyncOperation(ResourceNames.JMS_SERVER,
+ "createQueueConnectionFactory",
+ name,
+ NettyConnectorFactory.class.getName(),
+ new HashMap<String, Object>(),
+ new String[] { name });
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e);
+ }
}
public void createTopic(final String name)
@@ -148,7 +160,19 @@
public void createTopicConnectionFactory(final String name)
{
- createConnectionFactory(name);
+ try
+ {
+ invokeSyncOperation(ResourceNames.JMS_SERVER,
+ "createTopicConnectionFactory",
+ name,
+ NettyConnectorFactory.class.getName(),
+ new HashMap<String, Object>(),
+ new String[] { name });
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e);
+ }
}
public void deleteConnectionFactory(final String name)
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -35,6 +35,7 @@
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.unit.util.InVMContext;
import org.hornetq.tests.util.UnitTestCase;
@@ -159,6 +160,7 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
failoverOnServerShutdown,
null,
+ JMSFactoryType.CF,
"/cf");
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -28,6 +28,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.Configuration;
@@ -172,7 +173,7 @@
public void testDiscoveryConstructor() throws Exception
{
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(groupAddress, groupPort);
+ HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(groupAddress, groupPort, JMSFactoryType.CF);
assertFactoryParams(cf,
null,
groupAddress,
@@ -219,7 +220,7 @@
backupTC);
staticConnectors.add(pair0);
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(staticConnectors);
+ HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(staticConnectors, JMSFactoryType.CF);
assertFactoryParams(cf,
staticConnectors,
null,
@@ -267,7 +268,7 @@
backupTC);
staticConnectors.add(pair0);
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(liveTC, backupTC);
+ HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(liveTC, backupTC, JMSFactoryType.CF);
assertFactoryParams(cf,
staticConnectors,
null,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -28,6 +28,7 @@
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
+import javax.jms.XAConnectionFactory;
import javax.transaction.TransactionManager;
import junit.framework.Assert;
@@ -50,8 +51,9 @@
import org.hornetq.jms.bridge.ConnectionFactoryFactory;
import org.hornetq.jms.bridge.DestinationFactory;
import org.hornetq.jms.bridge.QualityOfServiceMode;
-import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.jms.client.HornetQXAConnectionFactory;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.unit.util.InVMContext;
@@ -73,8 +75,12 @@
protected ConnectionFactoryFactory cff0, cff1;
+ protected ConnectionFactoryFactory cff0xa, cff1xa;
+
protected ConnectionFactory cf0, cf1;
+ protected XAConnectionFactory cf0xa, cf1xa;
+
protected DestinationFactory sourceQueueFactory, targetQueueFactory, localTargetQueueFactory, sourceTopicFactory;
protected Queue sourceQueue, targetQueue, localTargetQueue;
@@ -173,8 +179,12 @@
server0.stop();
cff0 = cff1 = null;
+
+ cff0xa = cff1xa = null;
cf0 = cf1 = null;
+
+ cf0xa = cf1xa = null;
sourceQueueFactory = targetQueueFactory = localTargetQueueFactory = sourceTopicFactory = null;
@@ -203,7 +213,7 @@
{
public ConnectionFactory createConnectionFactory() throws Exception
{
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ HornetQJMSConnectionFactory cf = (HornetQJMSConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
// Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
cf.setReconnectAttempts(0);
@@ -216,14 +226,32 @@
};
- cf0 = cff0.createConnectionFactory();
+ cff0xa = new ConnectionFactoryFactory()
+ {
+ public Object createConnectionFactory() throws Exception
+ {
+ HornetQXAConnectionFactory cf = HornetQJMSClient.createXAConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ // Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
+ cf.setReconnectAttempts(0);
+ cf.setBlockOnNonDurableSend(true);
+ cf.setBlockOnDurableSend(true);
+ cf.setCacheLargeMessagesClient(true);
+
+ return cf;
+ }
+
+ };
+
+ cf0 = (ConnectionFactory)cff0.createConnectionFactory();
+ cf0xa = (XAConnectionFactory)cff0xa.createConnectionFactory();
+
cff1 = new ConnectionFactoryFactory()
{
public ConnectionFactory createConnectionFactory() throws Exception
{
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName(),
+ HornetQJMSConnectionFactory cf = (HornetQJMSConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName(),
params1));
// Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
@@ -236,8 +264,27 @@
}
};
- cf1 = cff1.createConnectionFactory();
+ cff1xa = new ConnectionFactoryFactory()
+ {
+ public XAConnectionFactory createConnectionFactory() throws Exception
+ {
+ HornetQXAConnectionFactory cf = (HornetQXAConnectionFactory) HornetQJMSClient.createXAConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName(),
+ params1));
+
+ // Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
+ cf.setReconnectAttempts(0);
+ cf.setBlockOnNonDurableSend(true);
+ cf.setBlockOnDurableSend(true);
+ cf.setCacheLargeMessagesClient(true);
+
+ return cf;
+ }
+ };
+
+ cf1 = (ConnectionFactory)cff1.createConnectionFactory();
+ cf1xa = (XAConnectionFactory)cff1xa.createConnectionFactory();
+
sourceQueueFactory = new DestinationFactory()
{
public Destination createDestination() throws Exception
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -18,6 +18,7 @@
import junit.framework.Assert;
import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.bridge.ConnectionFactoryFactory;
import org.hornetq.jms.bridge.QualityOfServiceMode;
import org.hornetq.jms.bridge.impl.JMSBridgeImpl;
@@ -196,10 +197,18 @@
{
JMSBridgeImpl bridge = null;
+ ConnectionFactoryFactory factInUse0 = cff0;
+ ConnectionFactoryFactory factInUse1 = cff1;
+ if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+ {
+ factInUse0 = cff0xa;
+ factInUse1 = cff1xa;
+ }
+
try
{
- bridge = new JMSBridgeImpl(cff0,
- cff1,
+ bridge = new JMSBridgeImpl(factInUse0,
+ factInUse1,
sourceQueueFactory,
targetQueueFactory,
null,
@@ -290,8 +299,8 @@
try
{
- bridge = new JMSBridgeImpl(cff0,
- cff1,
+ bridge = new JMSBridgeImpl(cff0xa,
+ cff1xa,
sourceQueueFactory,
targetQueueFactory,
null,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -33,6 +33,7 @@
import org.hornetq.api.jms.HornetQJMSConstants;
import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.bridge.ConnectionFactoryFactory;
import org.hornetq.jms.bridge.QualityOfServiceMode;
import org.hornetq.jms.bridge.impl.JMSBridgeImpl;
import org.hornetq.jms.client.HornetQMessage;
@@ -1444,10 +1445,18 @@
Thread t = null;
+ ConnectionFactoryFactory factInUse0 = cff0;
+ ConnectionFactoryFactory factInUse1 = cff1;
+ if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+ {
+ factInUse0 = cff0xa;
+ factInUse1 = cff1xa;
+ }
+
try
{
- bridge = new JMSBridgeImpl(cff0,
- cff1,
+ bridge = new JMSBridgeImpl(factInUse0,
+ factInUse1,
sourceQueueFactory,
targetQueueFactory,
null,
@@ -1530,10 +1539,18 @@
Thread t = null;
+ ConnectionFactoryFactory factInUse0 = cff0;
+ ConnectionFactoryFactory factInUse1 = cff1;
+ if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+ {
+ factInUse0 = cff0xa;
+ factInUse1 = cff1xa;
+ }
+
try
{
- bridge = new JMSBridgeImpl(cff0,
- cff1,
+ bridge = new JMSBridgeImpl(factInUse0,
+ factInUse1,
sourceQueueFactory,
targetQueueFactory,
null,
@@ -1617,10 +1634,16 @@
Thread t = null;
+ ConnectionFactoryFactory factInUse0 = cff0;
+ if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+ {
+ factInUse0 = cff0xa;
+ }
+
try
{
- bridge = new JMSBridgeImpl(cff0,
- cff0,
+ bridge = new JMSBridgeImpl(factInUse0,
+ factInUse0,
sourceQueueFactory,
localTargetQueueFactory,
null,
@@ -1699,12 +1722,19 @@
{
JMSBridgeImpl bridge = null;
+ ConnectionFactoryFactory factInUse0 = cff0;
+ ConnectionFactoryFactory factInUse1 = cff1;
+ if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+ {
+ factInUse0 = cff0xa;
+ factInUse1 = cff1xa;
+ }
try
{
final int NUM_MESSAGES = 10;
- bridge = new JMSBridgeImpl(cff0,
- cff1,
+ bridge = new JMSBridgeImpl(factInUse0,
+ factInUse1,
sourceQueueFactory,
targetQueueFactory,
null,
@@ -1770,12 +1800,18 @@
{
JMSBridgeImpl bridge = null;
+ ConnectionFactoryFactory factInUse0 = cff0;
+ if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+ {
+ factInUse0 = cff0xa;
+ }
+
try
{
final int NUM_MESSAGES = 10;
- bridge = new JMSBridgeImpl(cff0,
- cff0,
+ bridge = new JMSBridgeImpl(factInUse0,
+ factInUse0,
sourceQueueFactory,
localTargetQueueFactory,
null,
@@ -1840,14 +1876,22 @@
{
JMSBridgeImpl bridge = null;
+ ConnectionFactoryFactory factInUse0 = cff0;
+ ConnectionFactoryFactory factInUse1 = cff1;
+ if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+ {
+ factInUse0 = cff0xa;
+ factInUse1 = cff1xa;
+ }
+
try
{
final long MAX_BATCH_TIME = 3000;
final int MAX_BATCH_SIZE = 100000; // something big so it won't reach it
- bridge = new JMSBridgeImpl(cff0,
- cff1,
+ bridge = new JMSBridgeImpl(factInUse0,
+ factInUse1,
sourceQueueFactory,
targetQueueFactory,
null,
@@ -1894,14 +1938,20 @@
{
JMSBridgeImpl bridge = null;
+ ConnectionFactoryFactory factInUse0 = cff0;
+ if (qosMode.equals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE))
+ {
+ factInUse0 = cff0xa;
+ }
+
try
{
final long MAX_BATCH_TIME = 3000;
final int MAX_BATCH_SIZE = 100000; // something big so it won't reach it
- bridge = new JMSBridgeImpl(cff0,
- cff0,
+ bridge = new JMSBridgeImpl(factInUse0,
+ factInUse0,
sourceQueueFactory,
localTargetQueueFactory,
null,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/AutoGroupingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/AutoGroupingTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/AutoGroupingTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -19,7 +19,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
-import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
/**
* A AutoGroupingTest
@@ -34,7 +34,7 @@
@Override
protected ConnectionFactory getCF() throws Exception
{
- HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));
+ HornetQJMSConnectionFactory cf = HornetQJMSClient.createConnectionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));
cf.setAutoGroup(true);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/GroupIDTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/GroupIDTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/GroupIDTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -19,7 +19,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
-import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
/**
* A GroupIDTest
@@ -34,7 +34,7 @@
@Override
protected ConnectionFactory getCF() throws Exception
{
- HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));
+ HornetQJMSConnectionFactory cf = HornetQJMSClient.createConnectionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));
cf.setGroupID("wibble");
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -27,6 +27,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.tests.util.JMSTestBase;
/**
@@ -229,6 +230,7 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
failoverOnServerShutdown,
null,
+ JMSFactoryType.CF,
jndiBindings);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -34,6 +34,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.jms.HornetQJMSConstants;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.tests.util.JMSTestBase;
import org.hornetq.tests.util.UnitTestCase;
@@ -328,6 +329,7 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
failoverOnServerShutdown,
null,
+ JMSFactoryType.CF,
jndiBindings);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -34,6 +34,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.jms.client.HornetQSession;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.util.JMSTestBase;
@@ -97,6 +98,7 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
false,
null,
+ JMSFactoryType.CF,
"/cffoo");
cf = (ConnectionFactory)context.lookup("/cffoo");
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -27,6 +27,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.tests.util.JMSTestBase;
import org.hornetq.tests.util.RandomUtil;
@@ -264,6 +265,7 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
failoverOnServerShutdown,
null,
+ JMSFactoryType.CF,
jndiBindings);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -47,6 +47,7 @@
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQSession;
import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.unit.util.InVMContext;
@@ -159,7 +160,7 @@
{
HornetQConnectionFactory jbcf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"),
new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory",
- backupParams));
+ backupParams), JMSFactoryType.CF);
jbcf.setBlockOnDurableSend(true);
jbcf.setBlockOnNonDurableSend(true);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -30,6 +30,7 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.tests.util.JMSTestBase;
/**
@@ -175,6 +176,7 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
failoverOnServerShutdown,
null,
+ JMSFactoryType.CF,
jndiBindings);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -28,6 +28,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.tests.util.RandomUtil;
@@ -65,7 +66,7 @@
final long connectionTTL,
final long clientFailureCheckPeriod) throws JMSException
{
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(connectorFactory));
+ HornetQJMSConnectionFactory cf = (HornetQJMSConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(connectorFactory));
cf.setBlockOnNonDurableSend(true);
cf.setBlockOnDurableSend(true);
@@ -103,7 +104,7 @@
public static String[] sendMessages(final Destination destination, final int messagesToSend) throws Exception
{
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ HornetQJMSConnectionFactory cf = (HornetQJMSConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
return JMSUtil.sendMessages(cf, destination, messagesToSend);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -47,7 +47,7 @@
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
-import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.config.JMSConfiguration;
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
@@ -161,7 +161,7 @@
protected ConnectionFactory createConnectionFactory()
{
- return new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ return new HornetQJMSConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
}
protected Socket createSocket() throws IOException
Modified: trunk/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -53,6 +53,7 @@
import org.hornetq.jms.bridge.QualityOfServiceMode;
import org.hornetq.jms.bridge.impl.JMSBridgeImpl;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.unit.util.InVMContext;
@@ -157,7 +158,7 @@
private static ConnectionFactory createConnectionFactory()
{
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ HornetQJMSConnectionFactory cf = (HornetQJMSConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
// Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection
cf.setReconnectAttempts(0);
cf.setBlockOnNonDurableSend(true);
@@ -171,8 +172,10 @@
public void testStartWithRepeatedFailure() throws Exception
{
- HornetQConnectionFactory failingSourceCF = new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()))
+ HornetQJMSConnectionFactory failingSourceCF = new HornetQJMSConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()))
{
+ private static final long serialVersionUID = -2142578705002528826L;
+
@Override
public Connection createConnection() throws JMSException
{
@@ -212,8 +215,9 @@
public void testStartWithFailureThenSuccess() throws Exception
{
- HornetQConnectionFactory failingSourceCF = new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()))
+ HornetQJMSConnectionFactory failingSourceCF = new HornetQJMSConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()))
{
+ private static final long serialVersionUID = 1274250681150776714L;
boolean firstTime = true;
@Override
@@ -410,8 +414,10 @@
public void testExceptionOnSourceAndRetrySucceeds() throws Exception
{
final AtomicReference<Connection> sourceConn = new AtomicReference<Connection>();
- HornetQConnectionFactory failingSourceCF = new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()))
+ HornetQJMSConnectionFactory failingSourceCF = new HornetQJMSConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()))
{
+ private static final long serialVersionUID = -6930787952179727779L;
+
@Override
public Connection createConnection() throws JMSException
{
@@ -460,8 +466,9 @@
public void testExceptionOnSourceAndRetryFails() throws Exception
{
final AtomicReference<Connection> sourceConn = new AtomicReference<Connection>();
- HornetQConnectionFactory failingSourceCF = new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()))
+ HornetQJMSConnectionFactory failingSourceCF = new HornetQJMSConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()))
{
+ private static final long serialVersionUID = 4163579449500727852L;
boolean firstTime = true;
@Override
Modified: trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2010-10-11 00:56:51 UTC (rev 9763)
+++ trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2010-10-11 01:01:04 UTC (rev 9764)
@@ -29,6 +29,7 @@
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.server.impl.JMSFactoryType;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.unit.util.InVMContext;
@@ -213,6 +214,7 @@
HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION,
failoverOnServerShutdown,
null,
+ JMSFactoryType.CF,
jndiBindings);
}
13 years, 6 months