JBoss hornetq SVN: r11420 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-26 06:36:41 -0400 (Mon, 26 Sep 2011)
New Revision: 11420
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 do not log errors if the server is being stopped
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-26 10:36:08 UTC (rev 11419)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-26 10:36:41 UTC (rev 11420)
@@ -612,7 +612,11 @@
}
catch (Exception e)
{
+ if (e instanceof InterruptedException && !started)
+ // do not log errors if the server is being stopped.
+ return;
log.error("Failure in initialisation", e);
+ e.printStackTrace();
}
}
@@ -752,7 +756,9 @@
}
connectorsService.stop();
- //we stop the groupinghandler before we stop te cluster manager so binding mappings aren't removed in case of failover
+
+ // we stop the groupingHandler before we stop the cluster manager so binding mappings
+ // aren't removed in case of failover
if (groupingHandler != null)
{
managementService.removeNotificationListener(groupingHandler);
12 years, 9 months
JBoss hornetq SVN: r11419 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-26 06:36:08 -0400 (Mon, 26 Sep 2011)
New Revision: 11419
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnector.java
Log:
XXX TEMPORARILY Turn off noisy warnings
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-09-26 10:35:30 UTC (rev 11418)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-09-26 10:36:08 UTC (rev 11419)
@@ -346,7 +346,7 @@
if (httpEnabled)
{
handlers.put("http-decoder", new HttpRequestDecoder());
-
+
handlers.put("http-aggregator", new HttpChunkAggregator(Integer.MAX_VALUE));
handlers.put("http-encoder", new HttpResponseEncoder());
@@ -425,7 +425,7 @@
paused = false;
- if (!Version.ID.equals(VersionLoader.getVersion().getNettyVersion()))
+ if (!Version.ID.equals(VersionLoader.getVersion().getNettyVersion()) && false)
{
NettyAcceptor.log.warn("Unexpected Netty Version was expecting " + VersionLoader.getVersion()
.getNettyVersion() +
@@ -625,7 +625,7 @@
return new String[0];
}
String[] hosts = commaSeparatedHosts.split(",");
-
+
for (int i = 0; i < hosts.length; i++)
{
hosts[i] = hosts[i].trim();
@@ -709,12 +709,12 @@
public void connectionReadyForWrites(final Object connectionID, boolean ready)
{
NettyConnection conn = connections.get(connectionID);
-
+
if (conn != null)
{
conn.fireReady(ready);
- }
- }
+ }
+ }
}
private class BatchFlusher implements Runnable
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnector.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnector.java 2011-09-26 10:35:30 UTC (rev 11418)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnector.java 2011-09-26 10:36:08 UTC (rev 11419)
@@ -143,9 +143,9 @@
private final ScheduledExecutorService scheduledThreadPool;
private final Executor closeExecutor;
-
+
private BatchFlusher flusher;
-
+
private ScheduledFuture<?> batchFlusherFuture;
// Static --------------------------------------------------------
@@ -355,7 +355,7 @@
handlers.add(new HttpRequestEncoder());
handlers.add(new HttpResponseDecoder());
-
+
handlers.add(new HttpChunkAggregator(Integer.MAX_VALUE));
handlers.add(new HttpHandler());
@@ -374,11 +374,11 @@
if (batchDelay > 0)
{
flusher = new BatchFlusher();
-
+
batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher, batchDelay, batchDelay, TimeUnit.MILLISECONDS);
}
- if (!Version.ID.equals(VersionLoader.getVersion().getNettyVersion()))
+ if (!Version.ID.equals(VersionLoader.getVersion().getNettyVersion()) && false)
{
NettyConnector.log.warn("Unexpected Netty Version was expecting " + VersionLoader.getVersion()
.getNettyVersion() +
@@ -387,22 +387,22 @@
}
NettyConnector.log.debug("Started Netty Connector version " + Version.ID);
}
-
+
public synchronized void close()
{
if (channelFactory == null)
{
return;
}
-
+
if (batchFlusherFuture != null)
{
batchFlusherFuture.cancel(false);
-
+
flusher.cancel();
-
+
flusher = null;
-
+
batchFlusherFuture = null;
}
@@ -533,7 +533,7 @@
private String cookie;
private final CookieEncoder cookieEncoder = new CookieEncoder(false);
-
+
public HttpHandler() throws Exception
{
url = new URI("http", null, host, port, servletPath, null, null).toString();
@@ -705,10 +705,10 @@
public void connectionReadyForWrites(Object connectionID, boolean ready)
{
}
-
-
+
+
}
-
+
private class BatchFlusher implements Runnable
{
private boolean cancelled;
12 years, 9 months
JBoss hornetq SVN: r11418 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/replication/impl and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-26 06:35:30 -0400 (Mon, 26 Sep 2011)
New Revision: 11418
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
Log:
HORNETQ-720 Close the replicator in case we close the JournalStorageManager,
make sure we never send anything when replication is !enabled
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-26 10:34:14 UTC (rev 11417)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-26 10:35:30 UTC (rev 11418)
@@ -2055,8 +2055,10 @@
idGenerator.close();
}
+ if (replicator != null)
+ replicator.stop();
+
bindingsJournal.stop();
-
messageJournal.stop();
singleThreadExecutor.shutdown();
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-26 10:34:14 UTC (rev 11417)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-26 10:35:30 UTC (rev 11418)
@@ -101,10 +101,7 @@
public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport record)
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType, record));
- }
+ sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType, record));
}
/* (non-Javadoc)
@@ -115,10 +112,7 @@
final byte recordType,
final EncodingSupport record) throws Exception
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType, record));
- }
+ sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType, record));
}
/* (non-Javadoc)
@@ -126,10 +120,7 @@
*/
public void appendDeleteRecord(final byte journalID, final long id) throws Exception
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationDeleteMessage(journalID, id));
- }
+ sendReplicatePacket(new ReplicationDeleteMessage(journalID, id));
}
public void appendAddRecordTransactional(final byte journalID,
@@ -138,10 +129,7 @@
final byte recordType,
final EncodingSupport record) throws Exception
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationAddTXMessage(journalID, false, txID, id, recordType, record));
- }
+ sendReplicatePacket(new ReplicationAddTXMessage(journalID, false, txID, id, recordType, record));
}
/* (non-Javadoc)
@@ -153,10 +141,7 @@
final byte recordType,
final EncodingSupport record) throws Exception
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationAddTXMessage(journalID, true, txID, id, recordType, record));
- }
+ sendReplicatePacket(new ReplicationAddTXMessage(journalID, true, txID, id, recordType, record));
}
/* (non-Javadoc)
@@ -164,10 +149,7 @@
*/
public void appendCommitRecord(final byte journalID, final long txID, final boolean lineUp) throws Exception
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
- }
+ sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
}
/* (non-Javadoc)
@@ -178,10 +160,7 @@
final long id,
final EncodingSupport record) throws Exception
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, record));
- }
+ sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, record));
}
/* (non-Javadoc)
@@ -189,10 +168,7 @@
*/
public void appendDeleteRecordTransactional(final byte journalID, final long txID, final long id) throws Exception
{
- if (enabled)
- {
sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, NullEncoding.instance));
- }
}
/* (non-Javadoc)
@@ -200,10 +176,7 @@
*/
public void appendPrepareRecord(final byte journalID, final long txID, final EncodingSupport transactionData) throws Exception
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationPrepareMessage(journalID, txID, transactionData));
- }
+ sendReplicatePacket(new ReplicationPrepareMessage(journalID, txID, transactionData));
}
/* (non-Javadoc)
@@ -211,10 +184,7 @@
*/
public void appendRollbackRecord(final byte journalID, final long txID) throws Exception
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
- }
+ sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
}
/* (non-Javadoc)
@@ -222,10 +192,7 @@
*/
public void pageClosed(final SimpleString storeName, final int pageNumber)
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber, false));
- }
+ sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber, false));
}
/* (non-Javadoc)
@@ -233,10 +200,7 @@
*/
public void pageDeleted(final SimpleString storeName, final int pageNumber)
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber, true));
- }
+ sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber, true));
}
/* (non-Javadoc)
@@ -244,10 +208,7 @@
*/
public void pageWrite(final PagedMessage message, final int pageNumber)
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationPageWriteMessage(message, pageNumber));
- }
+ sendReplicatePacket(new ReplicationPageWriteMessage(message, pageNumber));
}
/* (non-Javadoc)
@@ -255,10 +216,7 @@
*/
public void largeMessageBegin(final long messageId)
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationLargeMessageBeingMessage(messageId));
- }
+ sendReplicatePacket(new ReplicationLargeMessageBeingMessage(messageId));
}
/* XXX Unused? */
@@ -275,10 +233,7 @@
*/
public void largeMessageWrite(final long messageId, final byte[] body)
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationLargeMessageWriteMessage(messageId, body));
- }
+ sendReplicatePacket(new ReplicationLargeMessageWriteMessage(messageId, body));
}
/* (non-Javadoc)
@@ -312,6 +267,7 @@
*/
public void stop() throws Exception
{
+
if (!started)
{
return;
@@ -333,7 +289,6 @@
ReplicationManagerImpl.log.warn("Error completing callback on replication manager", e);
}
}
-
if (replicatingChannel != null)
{
replicatingChannel.close();
@@ -372,6 +327,7 @@
public void compareJournals(final JournalLoadInformation[] journalInfo) throws HornetQException
{
+ if (enabled)
replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
}
@@ -388,6 +344,8 @@
private void sendReplicatePacket(final Packet packet, boolean lineUp)
{
+ if (!enabled)
+ return;
boolean runItNow = false;
OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
@@ -506,9 +464,12 @@
@Override
public void syncJournalFile(JournalFile jf, JournalContent content) throws Exception
{
- SequentialFile file = jf.getFile().copy();
- log.info("Replication: sending " + jf + " (size=" + file.size() + ") to backup. " + file);
- sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
+ if (enabled)
+ {
+ SequentialFile file = jf.getFile().copy();
+ log.info("Replication: sending " + jf + " (size=" + file.size() + ") to backup. " + file);
+ sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
+ }
}
@Override
@@ -536,6 +497,8 @@
long maxBytesToSend)
throws Exception
{
+ if (!enabled)
+ return;
if (!file.isOpen())
{
file.open(1, false);
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java 2011-09-26 10:34:14 UTC (rev 11417)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java 2011-09-26 10:35:30 UTC (rev 11418)
@@ -13,10 +13,6 @@
package org.hornetq.tests.integration.cluster.failover;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
-import org.hornetq.tests.integration.cluster.util.TestableServer;
-
/**
* A NettyReplicatedFailoverTest
*
12 years, 9 months
JBoss hornetq SVN: r11417 - branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/asyncio/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-26 06:34:14 -0400 (Mon, 26 Sep 2011)
New Revision: 11417
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/asyncio/impl/HornetQFileLock.java
Log:
Fix compilation error against Java7.
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/asyncio/impl/HornetQFileLock.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/asyncio/impl/HornetQFileLock.java 2011-09-26 09:55:05 UTC (rev 11416)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/asyncio/impl/HornetQFileLock.java 2011-09-26 10:34:14 UTC (rev 11417)
@@ -14,6 +14,7 @@
package org.hornetq.core.asyncio.impl;
import java.io.IOException;
+import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
/**
@@ -27,7 +28,7 @@
{
private final int handle;
-
+
/**
* @param channel
* @param position
@@ -36,7 +37,7 @@
*/
protected HornetQFileLock(final int handle)
{
- super(null, 0, 0, false);
+ super((FileChannel)null, 0, 0, false);
this.handle = handle;
}
12 years, 9 months
JBoss hornetq SVN: r11416 - in branches/Branch_2_2_EAP: src/config/jboss-as-5/clustered and 7 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-09-26 05:55:05 -0400 (Mon, 26 Sep 2011)
New Revision: 11416
Added:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java
Removed:
branches/Branch_2_2_EAP/examples/javaee/xarecovery/server/jbossts-properties.xml
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java
Modified:
branches/Branch_2_2_EAP/src/config/jboss-as-5/clustered/hornetq-jboss-beans.xml
branches/Branch_2_2_EAP/src/config/jboss-as-5/non-clustered/hornetq-jboss-beans.xml
branches/Branch_2_2_EAP/src/config/jboss-as-6/clustered/hornetq-jboss-beans.xml
branches/Branch_2_2_EAP/src/config/jboss-as-6/non-clustered/hornetq-jboss-beans.xml
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/Util.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQActivation.java
Log:
https://issues.jboss.org/browse/HORNETQ-640 implemented XA recovery API
Deleted: branches/Branch_2_2_EAP/examples/javaee/xarecovery/server/jbossts-properties.xml
===================================================================
--- branches/Branch_2_2_EAP/examples/javaee/xarecovery/server/jbossts-properties.xml 2011-09-26 09:54:29 UTC (rev 11415)
+++ branches/Branch_2_2_EAP/examples/javaee/xarecovery/server/jbossts-properties.xml 2011-09-26 09:55:05 UTC (rev 11416)
@@ -1,348 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<transaction-service>
- <properties name="documentation">
- <!--
- This is the JBossTS configuration file. Note that starting with JBossTS 4.6
- the names jbossjta-properties.xml and jbossjts-properties.xml are obsolete.
-
- Both the JTA and JTS versions of JBossTS now read their configuration from
- jbossts-properties.xml, although the contents of the file differs between
- the JTA and JTS. Care should be taken to use the correct version of the file.
-
- ***************************
-
- Property values may be literals or be tokens of the form ${p1[,p2][:v]}
- in which case the token values are substituted for the values of the corresponding system
- properties as follows:
-
- - Any occurance of ${p} with the System.getProperty(p) value.
- If there is no such property p defined, then the ${p} reference will remain unchanged.
-
- - If the property reference is of the form ${p:v} and there is no such property p,
- then the default value v will be returned.
-
- - If the property reference is of the form ${p1,p2} or ${p1,p2:v} then
- the primary and the secondary properties will be tried in turn, before
- returning either the unchanged input, or the default value.
-
- The property ${/} is replaced with System.getProperty("file.separator")
- value and the property ${:} is replaced with System.getProperty("path.separator").
-
- Note this substitution applies to property values only at the point they are read from
- the config file. Tokens in system properties won't be substituted.
- -->
- </properties>
- <properties depends="common" name="arjuna">
- <!--
- Transaction Reaper Timeout (default is 120000 ms).
- -->
- <property
- name="com.arjuna.ats.arjuna.coordinator.txReaperTimeout" value="120000"/>
- <!--
- Transaction Reaper Mode, can be: PERIODIC or DYNAMIC. Default is DYNAMIC.
- -->
- <property name="com.arjuna.ats.arjuna.coordinator.txReaperMode" value="DYNAMIC"/>
- <!--
- Transaction Reaper Cancel Wait Period (default is 500 ms, min is 10 msecs).
- -->
- <property
- name="com.arjuna.ats.arjuna.coordinator.txReaperCancelWaitPeriod" value="500"/>
- <!--
- Transaction Reaper Cancel Fail Wait Period (default is 500 ms, min is 10 msecs).
- -->
- <property
- name="com.arjuna.ats.arjuna.coordinator.txReaperCancelFailWaitPeriod" value="500"/>
- <!--
- Transaction Reaper Zombie Max (default is 8).
- -->
- <property
- name="com.arjuna.ats.arjuna.coordinator.txReaperZombieMax" value="8"/>
- <!--
- (default is NO)
- -->
- <property name="com.arjuna.ats.arjuna.coordinator.asyncCommit" value="NO"/>
- <!--
- (default is NO)
- -->
- <property name="com.arjuna.ats.arjuna.coordinator.asyncPrepare" value="NO"/>
- <!--
- (default is YES)
- -->
- <property
- name="com.arjuna.ats.arjuna.coordinator.commitOnePhase" value="YES"/>
- <!--
- (default is defaultStore)
- -->
- <property name="com.arjuna.ats.arjuna.objectstore.localOSRoot" value="defaultStore"/>
- <!--
- default is under user.home - must be writeable!)
- -->
- <property
- name="com.arjuna.ats.arjuna.objectstore.objectStoreDir" value="PutObjectStoreDirHere"/>
- <!--
- (default is ON)
- -->
- <property
- name="com.arjuna.ats.arjuna.objectstore.objectStoreSync" value="ON"/>
- <!--
- (default is ShadowNoFileLockStore)
- -->
- <property
- name="com.arjuna.ats.arjuna.objectstore.objectStoreType" value="ShadowNoFileLockStore"/>
- <!--
- (default is 255)
- -->
- <property
- name="com.arjuna.ats.arjuna.objectstore.hashedDirectories" value="255"/>
- <!--
- (default is ON)
- -->
- <property
- name="com.arjuna.ats.arjuna.objectstore.transactionSync" value="ON"/>
- <!--
- (Must be unique across all Arjuna instances.)
- -->
- <property name="com.arjuna.ats.arjuna.xa.nodeIdentifier" value="1"/>
- <!--
- Base port number for determining a unique number to associate with an instance of the transaction service
- (which is needed in order to support multiple instances on the same machine).
- Use the value 0 to allow the system to select the first available port number.
- If the port number is non-zero and the port is in use then the value will be incremented until either a successful binding
- to the loopback address is created or until the the maximum number of ports (specified by the
- com.arjuna.ats.internal.arjuna.utils.SocketProcessIdMaxPorts property) have been tried or until the port number
- reaches the maximum possible port number.
- -->
- <property
- name="com.arjuna.ats.internal.arjuna.utils.SocketProcessIdPort" value="0"/>
- <!--
- The maximum number of ports to try starting from the value specified by the property
- com.arjuna.ats.internal.arjuna.utils.SocketProcessIdPort. Any non-numeric or value less than 1 will
- defautl to 1.
- -->
- <property
- name="com.arjuna.ats.internal.arjuna.utils.SocketProcessIdMaxPorts" value="1"/>
- <!--
- Run the TransactionStatusManager to allow out-of-process recovery managers to query
- the status of transactions owned by this coordinator. Default is YES.
- This can be set to NO in cases where an ObjectStore is used only by one transaction manager
- and the recovery manager for that store is in the same JVM. In any other cases disabling the
- TransactionStatusManager may cause crash recovery to misbehave.
- -->
- <property
- name="com.arjuna.ats.arjuna.coordinator.transactionStatusManagerEnable" value="YES"/>
- <!-- property
- name="com.arjuna.ats.arjuna.coordinator.actionStore"
- value="HashedActionStore"
- value="JDBCActionStore"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.objectstore.jdbcTxDbAccess"
- value="JDBCAccess"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.objectstore.objectStoreType"
- value="ShadowNoFileLockStore"
- value="JDBCStore"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.objectstore.jdbcUserDbAccess"
- value="JDBCAccess"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.objectstore.jdbcPoolSizeInitial"
- value="1"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.objectstore.jdbcPoolSizeMaximum"
- value="1"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.objectstore.jdbcPoolPutConnections"
- value="false"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.internal.arjuna.objectstore.cacheStore.size"
- value=""
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.internal.arjuna.objectstore.cacheStore.period"
- value=""
- -->
- <!--
- The location for creating temporary files, e.g., Uids.
- Default is under user.home.
- IMPORTANT: make sure the directory is lockable, e.g., /tmp on Unix
- may not be!
- -->
- <!--
- <property
- name="com.arjuna.ats.arjuna.common.varDir"
- value="var"/>
- -->
- <!-- Should beforeCompletion synchronizations be fired even when it is known the
- transaction can't commit e.g. is marked rollbackOnly? (default NO).
- Note that turning this on still does not guarantee the syncronizations will run in all cases
- e.g. explicit rollback() calls. JTS users should also take into account the supportRollbackSync
- property which affects both beforeCompletion and afterCompletion syncs. -->
- <!--
- <property name="com.arjuna.ats.coordinator.beforeCompletionWhenRollbackOnly" value="NO"/>
- -->
- </properties>
- <properties name="common">
- <!-- CLF 2.0 properties -->
- <property name="com.arjuna.common.util.logging.DebugLevel"
- type="System" value="0x00000000"/>
- <property name="com.arjuna.common.util.logging.FacilityLevel"
- type="System" value="0xffffffff"/>
- <property name="com.arjuna.common.util.logging.VisibilityLevel"
- type="System" value="0xffffffff"/>
- <property name="com.arjuna.common.util.logger" type="System" value="log4j"/>
- </properties>
- <properties depends="arjuna" name="txoj">
- <!--
- (default is LockStore of installation - must be writeable!)
- -->
- <!--
- <property
- name="com.arjuna.ats.txoj.lockstore.lockStoreDir"
- value="LockStore"/>
- -->
- <!--
- (default is BasicLockStore)
- -->
- <property name="com.arjuna.ats.txoj.lockstore.lockStoreType" value="BasicLockStore"/>
- <!--
- (default is NO)
- -->
- <property name="com.arjuna.ats.txoj.lockstore.multipleLockStore" value="NO"/>
- <!--
- (default is YES)
- -->
- <property name="com.arjuna.ats.txoj.lockstore.singleLockStore" value="YES"/>
- <!--
- (default is YES)
- -->
- <property
- name="com.arjuna.ats.txoj.lockstore.allowNestedLocking" value="YES"/>
- </properties>
- <properties depends="arjuna" name="jta">
- <!--
- Support subtransactions in the JTA layer?
- Default is NO.
- -->
- <property name="com.arjuna.ats.jta.supportSubtransactions" value="NO"/>
- <property name="com.arjuna.ats.jta.jtaTMImplementation" value="com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple"/>
- <!--
- com.arjuna.ats.internal.jta.transaction.jts.TransactionManagerImple
- -->
- <property name="com.arjuna.ats.jta.jtaUTImplementation" value="com.arjuna.ats.internal.jta.transaction.arjunacore.UserTransactionImple"/>
- <!--
- com.arjuna.ats.internal.jta.transaction.jts.UserTransactionImple
- -->
- <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1"
- value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"/>
-
- <!--you'll need something like this if the HornetQ Server is remote-->
- <!--
- <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2"
- value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445"/>-->
-
- <!--you'll need something like this if the HornetQ Server is remote and has failover configured-->
- <!--
- <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2"
- value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost2,port=5446"/>-->
-
-
- <property name="com.arjuna.ats.jta.xaRecoveryNode" value="1"/>
- </properties>
- <properties depends="arjuna,txoj,jta" name="recoverymanager">
- <!--
- Properties used only by the RecoveryManager.
- -->
- <!--
- Periodic recovery settings.
- Time values in this section are in seconds.
- -->
- <!--
- Interval in seconds between initiating the periodic recovery modules.
- Default is 120 seconds.
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.periodicRecoveryPeriod" value="120"/>
- <!--
- Interval in seconds between first and second pass of periodic recovery.
- Default is 10 seconds.
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.recoveryBackoffPeriod" value="10"/>
- <!--
- The port number on which the recovery manager listens.
- -->
- <property name="com.arjuna.ats.arjuna.recovery.recoveryPort" value="4712"/>
- <!--
- The address on which the recovery manager listens.
- If running within an AS then the address the AS is bound to (jboss.bind.address) takes precedence
- -->
- <property name="com.arjuna.ats.arjuna.recovery.recoveryAddress" value=""/>
- <!--
- Periodic recovery modules to use. Invoked in sort-order of names.
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.recoveryExtension1" value="com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule"/>
- <property
- name="com.arjuna.ats.arjuna.recovery.recoveryExtension2" value="com.arjuna.ats.internal.txoj.recovery.TORecoveryModule"/>
- <property
- name="com.arjuna.ats.arjuna.recovery.recoveryExtension3" value="com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule"/>
- <!--
- Expired entry removal
- -->
- <!--
- Expiry scanners to use (order of invocation is random).
- Names must begin with "com.arjuna.ats.arjuna.recovery.expiryScanner"
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.expiryScannerTransactionStatusManager" value="com.arjuna.ats.internal.arjuna.recovery.ExpiredTransactionStatusManagerScanner"/>
- <!--
- Interval, in hours, between running the expiry scanners.
- This can be quite long. The absolute value determines the interval -
- if the value is negative, the scan will NOT be run until after one
- interval has elapsed. If positive the first scan will be immediately
- after startup. Zero will prevent any scanning.
- Default = 12 = run immediately, then every 12 hours.
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.expiryScanInterval" value="12"/>
- <!--
- This is the interval, in hours, after which a process that cannot be contacted will be considered dead.
- It should be long enough to avoid accidentally removing valid entries due to short lived
- transient errors such as network downtime. Zero = Never removed. Default is 12.
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.transactionStatusManagerExpiryTime" value="12"/>
- <!--
- Use this to fix the port on which the TransactionStatusManager listens,
- The default behaviour is to use any free port.
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.transactionStatusManagerPort" value="0"/>
- <!--
- Use this to fix the address on which the TransactionStatusManager binds,
- The default behaviour is to use the loopback address (ie localhost).
- If running within an AS then the address the AS is bound to (jboss.bind.address) takes precedence
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.transactionStatusManagerAddress" value=""/>
- <!--
- For cases where the recovery manager is in process with the transaction manager and nothing else uses
- the ObjectStore, it is possible to disable the socket based recovery listener by setting this to NO.
- Caution: use of this property can allow multiple recovery processes to run on the same ObjectStore
- if you are not careful. That in turn can lead to incorrect transaction processing. Use with care.
- -->
- <property name="com.arjuna.ats.arjuna.recovery.recoveryListener" value="YES"/>
- </properties>
- <properties depends="jta" name="jdbc">
- <!--
- property name="com.arjuna.ats.jdbc.isolationLevel" value="TRANSACTION_SERIALIZABLE"/>
- -->
- </properties>
-</transaction-service>
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-5/clustered/hornetq-jboss-beans.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-5/clustered/hornetq-jboss-beans.xml 2011-09-26 09:54:29 UTC (rev 11415)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-5/clustered/hornetq-jboss-beans.xml 2011-09-26 09:55:05 UTC (rev 11416)
@@ -58,4 +58,13 @@
</constructor>
<depends>HornetQServer</depends>
</bean>
+
+ <bean name="AS5RecoveryRegistry" class="org.hornetq.jms.server.recovery.AS5RecoveryRegistry">
+ <constructor>
+ <parameter>
+ <inject bean="TransactionManager"/>
+ </parameter>
+ </constructor>
+ </bean>
+
</deployment>
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-5/non-clustered/hornetq-jboss-beans.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-5/non-clustered/hornetq-jboss-beans.xml 2011-09-26 09:54:29 UTC (rev 11415)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-5/non-clustered/hornetq-jboss-beans.xml 2011-09-26 09:55:05 UTC (rev 11416)
@@ -58,5 +58,13 @@
</constructor>
<depends>HornetQServer</depends>
</bean>
+
+ <bean name="AS5RecoveryRegistry" class="org.hornetq.jms.server.recovery.AS5RecoveryRegistry">
+ <constructor>
+ <parameter>
+ <inject bean="TransactionManager"/>
+ </parameter>
+ </constructor>
+ </bean>
</deployment>
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-6/clustered/hornetq-jboss-beans.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-6/clustered/hornetq-jboss-beans.xml 2011-09-26 09:54:29 UTC (rev 11415)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-6/clustered/hornetq-jboss-beans.xml 2011-09-26 09:55:05 UTC (rev 11416)
@@ -59,4 +59,12 @@
<depends>HornetQServer</depends>
</bean>
+ <bean name="AS5RecoveryRegistry" class="org.jboss.as.integration.hornetq.recovery.AS5RecoveryRegistry">
+ <constructor>
+ <parameter>
+ <inject bean="TransactionManager"/>
+ </parameter>
+ </constructor>
+ </bean>
+
</deployment>
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-6/non-clustered/hornetq-jboss-beans.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-6/non-clustered/hornetq-jboss-beans.xml 2011-09-26 09:54:29 UTC (rev 11415)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-6/non-clustered/hornetq-jboss-beans.xml 2011-09-26 09:55:05 UTC (rev 11416)
@@ -58,4 +58,12 @@
</constructor>
<depends>HornetQServer</depends>
</bean>
+
+ <bean name="AS5RecoveryRegistry" class="org.jboss.as.integration.hornetq.recovery.AS5RecoveryRegistry">
+ <constructor>
+ <parameter>
+ <inject bean="TransactionManager"/>
+ </parameter>
+ </constructor>
+ </bean>
</deployment>
Added: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java (rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java 2011-09-26 09:55:05 UTC (rev 11416)
@@ -0,0 +1,64 @@
+/*
+* JBoss, Home of Professional Open Source.
+* Copyright 2010, Red Hat, Inc., and individual contributors
+* as indicated by the @author tags. See the copyright.txt file in the
+* distribution for a full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.hornetq.jms.server.recovery;
+
+import org.jboss.tm.XAResourceRecovery;
+
+import javax.transaction.xa.XAResource;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * 9/20/11
+ */
+public class HornetQResourceRecovery implements XAResourceRecovery
+{
+ private final XARecoveryConfig config;
+
+ public HornetQResourceRecovery(XARecoveryConfig config)
+ {
+ this.config = config;
+ }
+
+ public XAResource[] getXAResources()
+ {
+ return new XAResource[]{new HornetQXAResourceWrapper(config)};
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ HornetQResourceRecovery that = (HornetQResourceRecovery) o;
+
+ if (config != null ? !config.equals(that.config) : that.config != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return config != null ? config.hashCode() : 0;
+ }
+}
Deleted: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java 2011-09-26 09:54:29 UTC (rev 11415)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java 2011-09-26 09:55:05 UTC (rev 11416)
@@ -1,224 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.jms.server.recovery;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.transaction.xa.XAResource;
-
-import com.arjuna.ats.jta.recovery.XAResourceRecovery;
-
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.logging.Logger;
-
-/**
- *
- * A XAResourceRecovery instance that can be used to recover any JMS provider.
- *
- * In reality only recover,rollback and commit will be called but we still need to
- * be implement all methods just in case.
- *
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class HornetQXAResourceRecovery implements XAResourceRecovery
-{
- private final boolean trace = HornetQXAResourceRecovery.log.isTraceEnabled();
-
- private static final Logger log = Logger.getLogger(HornetQXAResourceRecovery.class);
-
- private boolean hasMore;
-
- private HornetQXAResourceWrapper res;
-
- public HornetQXAResourceRecovery()
- {
- if (trace)
- {
- HornetQXAResourceRecovery.log.trace("Constructing HornetQXAResourceRecovery");
- }
- }
-
- public boolean initialise(final String config)
- {
- if (HornetQXAResourceRecovery.log.isTraceEnabled())
- {
- HornetQXAResourceRecovery.log.trace(this + " intialise: " + config);
- }
-
- String[] configs = config.split(";");
- XARecoveryConfig[] xaRecoveryConfigs = new XARecoveryConfig[configs.length];
- for (int i = 0, configsLength = configs.length; i < configsLength; i++)
- {
- String s = configs[i];
- ConfigParser parser = new ConfigParser(s);
- String connectorFactoryClassName = parser.getConnectorFactoryClassName();
- Map<String, Object> connectorParams = parser.getConnectorParameters();
- String username = parser.getUsername();
- String password = parser.getPassword();
- TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams);
- xaRecoveryConfigs[i] = new XARecoveryConfig(transportConfiguration, username, password);
- }
-
-
-
- res = new HornetQXAResourceWrapper(xaRecoveryConfigs);
-
- if (HornetQXAResourceRecovery.log.isTraceEnabled())
- {
- HornetQXAResourceRecovery.log.trace(this + " initialised");
- }
-
- return true;
- }
-
- public boolean hasMoreResources()
- {
- if (HornetQXAResourceRecovery.log.isTraceEnabled())
- {
- HornetQXAResourceRecovery.log.trace(this + " hasMoreResources");
- }
-
- /*
- * The way hasMoreResources is supposed to work is as follows:
- * For each "sweep" the recovery manager will call hasMoreResources, then if it returns
- * true it will call getXAResource.
- * It will repeat that until hasMoreResources returns false.
- * Then the sweep is over.
- * For the next sweep hasMoreResources should return true, etc.
- *
- * In our case where we only need to return one XAResource per sweep,
- * hasMoreResources should basically alternate between true and false.
- *
- *
- */
-
- hasMore = !hasMore;
-
- return hasMore;
- }
-
- public XAResource getXAResource()
- {
- if (HornetQXAResourceRecovery.log.isTraceEnabled())
- {
- HornetQXAResourceRecovery.log.trace(this + " getXAResource");
- }
-
- return res;
- }
-
- public XAResource[] getXAResources()
- {
- return new XAResource[] { res };
- }
-
- @Override
- protected void finalize()
- {
- res.close();
- }
-
- public static class ConfigParser
- {
- private final String connectorFactoryClassName;
-
- private final Map<String, Object> connectorParameters;
-
- private String username;
-
- private String password;
-
- public ConfigParser(final String config)
- {
- if (config == null || config.length() == 0)
- {
- throw new IllegalArgumentException("Must specify provider connector factory class name in config");
- }
-
- String[] strings = config.split(",");
-
- // First (mandatory) param is the connector factory class name
- if (strings.length < 1)
- {
- throw new IllegalArgumentException("Must specify provider connector factory class name in config");
- }
-
- connectorFactoryClassName = strings[0].trim();
-
- // Next two (optional) parameters are the username and password to use for creating the session for recovery
-
- if (strings.length >= 2)
- {
-
- username = strings[1].trim();
- if (username.length() == 0)
- {
- username = null;
- }
-
- if (strings.length == 2)
- {
- throw new IllegalArgumentException("If username is specified, password must be specified too");
- }
-
- password = strings[2].trim();
- if (password.length() == 0)
- {
- password = null;
- }
- }
-
- // other tokens are for connector configurations
- connectorParameters = new HashMap<String, Object>();
- if (strings.length >= 3)
- {
- for (int i = 3; i < strings.length; i++)
- {
- String[] str = strings[i].split("=");
- if (str.length == 2)
- {
- connectorParameters.put(str[0].trim(), str[1].trim());
- }
- }
- }
- }
-
- public String getConnectorFactoryClassName()
- {
- return connectorFactoryClassName;
- }
-
- public Map<String, Object> getConnectorParameters()
- {
- return connectorParameters;
- }
-
- public String getUsername()
- {
- return username;
- }
-
- public String getPassword()
- {
- return password;
- }
- }
-}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2011-09-26 09:54:29 UTC (rev 11415)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2011-09-26 09:55:05 UTC (rev 11416)
@@ -60,7 +60,7 @@
private XARecoveryConfig[] xaRecoveryConfigs;
- private TransportConfiguration currentConnection;
+ //private TransportConfiguration currentConnection;
public HornetQXAResourceWrapper(XARecoveryConfig... xaRecoveryConfigs)
{
@@ -71,7 +71,7 @@
public Xid[] recover(final int flag) throws XAException
{
XAResource xaResource = getDelegate(false);
- HornetQXAResourceWrapper.log.debug("Recover " + currentConnection);
+ HornetQXAResourceWrapper.log.debug("Recover " + xaResource);
try
{
return xaResource.recover(flag);
@@ -85,7 +85,7 @@
public void commit(final Xid xid, final boolean onePhase) throws XAException
{
XAResource xaResource = getDelegate(true);
- HornetQXAResourceWrapper.log.debug("Commit " + currentConnection + " xid " + " onePhase=" + onePhase);
+ HornetQXAResourceWrapper.log.debug("Commit " + xaResource + " xid " + " onePhase=" + onePhase);
try
{
xaResource.commit(xid, onePhase);
@@ -99,7 +99,7 @@
public void rollback(final Xid xid) throws XAException
{
XAResource xaResource = getDelegate(true);
- HornetQXAResourceWrapper.log.debug("Rollback " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("Rollback " + xaResource + " xid ");
try
{
xaResource.rollback(xid);
@@ -113,7 +113,7 @@
public void forget(final Xid xid) throws XAException
{
XAResource xaResource = getDelegate(false);
- HornetQXAResourceWrapper.log.debug("Forget " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("Forget " + xaResource + " xid ");
try
{
xaResource.forget(xid);
@@ -145,7 +145,7 @@
public int prepare(final Xid xid) throws XAException
{
XAResource xaResource = getDelegate(true);
- HornetQXAResourceWrapper.log.debug("prepare " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("prepare " + xaResource + " xid ");
try
{
return xaResource.prepare(xid);
@@ -159,7 +159,7 @@
public void start(final Xid xid, final int flags) throws XAException
{
XAResource xaResource = getDelegate(false);
- HornetQXAResourceWrapper.log.debug("start " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("start " + xaResource + " xid ");
try
{
xaResource.start(xid, flags);
@@ -173,7 +173,7 @@
public void end(final Xid xid, final int flags) throws XAException
{
XAResource xaResource = getDelegate(false);
- HornetQXAResourceWrapper.log.debug("end " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("end " + xaResource + " xid ");
try
{
xaResource.end(xid, flags);
@@ -187,7 +187,7 @@
public int getTransactionTimeout() throws XAException
{
XAResource xaResource = getDelegate(false);
- HornetQXAResourceWrapper.log.debug("getTransactionTimeout " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("getTransactionTimeout " + xaResource + " xid ");
try
{
return xaResource.getTransactionTimeout();
@@ -201,7 +201,7 @@
public boolean setTransactionTimeout(final int seconds) throws XAException
{
XAResource xaResource = getDelegate(false);
- HornetQXAResourceWrapper.log.debug("setTransactionTimeout " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("setTransactionTimeout " + xaResource + " xid ");
try
{
return xaResource.setTransactionTimeout(seconds);
@@ -214,7 +214,7 @@
public void connectionFailed(final HornetQException me, boolean failedOver)
{
- HornetQXAResourceWrapper.log.warn("Notified of connection failure in xa recovery connectionFactory for provider " + currentConnection + " will attempt reconnect on next pass",
+ HornetQXAResourceWrapper.log.warn("Notified of connection failure in xa recovery connectionFactory for provider " + csf + " will attempt reconnect on next pass",
me);
close();
}
@@ -299,7 +299,7 @@
try
{
- serverLocator = HornetQClient.createServerLocatorWithoutHA(xaRecoveryConfig.getTransportConfiguration());
+ serverLocator = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator();
serverLocator.disableFinalizeCheck();
csf = serverLocator.createSessionFactory();
if (xaRecoveryConfig.getUsername() == null)
@@ -320,12 +320,10 @@
synchronized (HornetQXAResourceWrapper.lock)
{
delegate = cs;
- currentConnection = xaRecoveryConfig.getTransportConfiguration();
}
return delegate;
}
- currentConnection = null;
throw new HornetQException(HornetQException.NOT_CONNECTED);
}
Added: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java (rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java 2011-09-26 09:55:05 UTC (rev 11416)
@@ -0,0 +1,36 @@
+/*
+* JBoss, Home of Professional Open Source.
+* Copyright 2010, Red Hat, Inc., and individual contributors
+* as indicated by the @author tags. See the copyright.txt file in the
+* distribution for a full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.hornetq.jms.server.recovery;
+
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.ra.HornetQRAConnectionFactory;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * 9/20/11
+ */
+public interface RecoveryRegistry
+{
+ void register(HornetQResourceRecovery resourceRecovery);
+
+ void unRegister(HornetQResourceRecovery xaRecoveryConfig);
+}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java 2011-09-26 09:54:29 UTC (rev 11415)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java 2011-09-26 09:55:05 UTC (rev 11416)
@@ -13,7 +13,7 @@
package org.hornetq.jms.server.recovery;
-import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.jms.client.HornetQConnectionFactory;
/**
* @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
@@ -24,20 +24,20 @@
*/
public class XARecoveryConfig
{
- private final TransportConfiguration transportConfiguration;
+ private final HornetQConnectionFactory hornetQConnectionFactory;
private final String username;
private final String password;
- public XARecoveryConfig(TransportConfiguration transportConfiguration, String username, String password)
+ public XARecoveryConfig(HornetQConnectionFactory hornetQConnectionFactory, String username, String password)
{
- this.transportConfiguration = transportConfiguration;
+ this.hornetQConnectionFactory = hornetQConnectionFactory;
this.username = username;
this.password = password;
}
- public TransportConfiguration getTransportConfiguration()
+ public HornetQConnectionFactory getHornetQConnectionFactory()
{
- return transportConfiguration;
+ return hornetQConnectionFactory;
}
public String getUsername()
@@ -49,4 +49,29 @@
{
return password;
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ XARecoveryConfig that = (XARecoveryConfig) o;
+
+ if (hornetQConnectionFactory != null ? !hornetQConnectionFactory.equals(that.hornetQConnectionFactory) : that.hornetQConnectionFactory != null)
+ return false;
+ if (password != null ? !password.equals(that.password) : that.password != null) return false;
+ if (username != null ? !username.equals(that.username) : that.username != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = hornetQConnectionFactory != null ? hornetQConnectionFactory.hashCode() : 0;
+ result = 31 * result + (username != null ? username.hashCode() : 0);
+ result = 31 * result + (password != null ? password.hashCode() : 0);
+ return result;
+ }
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnection.java 2011-09-26 09:54:29 UTC (rev 11415)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnection.java 2011-09-26 09:55:05 UTC (rev 11416)
@@ -249,6 +249,8 @@
HornetQRAManagedConnection.log.debug("Error unsetting the exception listener " + this, e);
}
+ mcf.stop();
+
destroyHandles();
try
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java 2011-09-26 09:54:29 UTC (rev 11415)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java 2011-09-26 09:55:05 UTC (rev 11416)
@@ -30,6 +30,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.recovery.HornetQResourceRecovery;
/**
* HornetQ ManagedConectionFactory
@@ -76,6 +77,11 @@
*/
private HornetQConnectionFactory connectionFactory;
+ /*
+ * The resource recovery if there is one
+ * */
+ private HornetQResourceRecovery resourceRecovery;
+
/**
* Constructor
*/
@@ -747,6 +753,7 @@
if (connectionFactory == null)
{
connectionFactory = ra.createHornetQConnectionFactory(mcfProperties);
+ resourceRecovery = ra.getRecoveryManager().register(connectionFactory, null, null);
}
return connectionFactory;
}
@@ -791,4 +798,12 @@
return info;
}
}
+
+ public void stop()
+ {
+ if(resourceRecovery != null)
+ {
+ ra.getRecoveryManager().unRegister(resourceRecovery);
+ }
+ }
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-09-26 09:54:29 UTC (rev 11415)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-09-26 09:55:05 UTC (rev 11416)
@@ -41,8 +41,11 @@
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.recovery.RecoveryRegistry;
+import org.hornetq.jms.server.recovery.XARecoveryConfig;
import org.hornetq.ra.inflow.HornetQActivation;
import org.hornetq.ra.inflow.HornetQActivationSpec;
+import org.hornetq.ra.recovery.RecoveryManager;
/**
* The resource adapter for HornetQ
@@ -106,6 +109,8 @@
private String unparsedJndiParams;
+ RecoveryManager recoveryManager;
+
/**
* Constructor
*/
@@ -119,6 +124,7 @@
raProperties = new HornetQRAProperties();
configured = new AtomicBoolean(false);
activations = new ConcurrentHashMap<ActivationSpec, HornetQActivation>();
+ recoveryManager = new RecoveryManager();
}
public TransactionManager getTM()
@@ -209,6 +215,8 @@
locateTM();
+ recoveryManager.start();
+
this.ctx = ctx;
HornetQResourceAdapter.log.info("HornetQ resource adaptor started");
@@ -241,8 +249,12 @@
if (defaultHornetQConnectionFactory != null)
{
defaultHornetQConnectionFactory.close();
+
+ XARecoveryConfig xaRecoveryConfig = new XARecoveryConfig(defaultHornetQConnectionFactory, raProperties.getUserName(), raProperties.getPassword());
}
+ recoveryManager.stop();
+
HornetQResourceAdapter.log.info("HornetQ resource adapter stopped");
}
@@ -1317,6 +1329,13 @@
}
+
+
+ public RecoveryManager getRecoveryManager()
+ {
+ return recoveryManager;
+ }
+
/**
* Get the resource adapter properties
*
@@ -1338,6 +1357,7 @@
protected void setup() throws HornetQException
{
defaultHornetQConnectionFactory = createHornetQConnectionFactory(raProperties);
+ recoveryManager.register(defaultHornetQConnectionFactory, raProperties.getUserName(), raProperties.getPassword());
}
public Map<ActivationSpec, HornetQActivation> getActivations()
@@ -1375,7 +1395,7 @@
{
ha = HornetQClient.DEFAULT_IS_HA;
}
-
+
if (connectorClassName != null)
{
TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()];
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/Util.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/Util.java 2011-09-26 09:54:29 UTC (rev 11415)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/Util.java 2011-09-26 09:55:05 UTC (rev 11416)
@@ -19,6 +19,7 @@
import javax.transaction.TransactionManager;
import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.server.recovery.RecoveryRegistry;
/**
* Various utility functions
@@ -269,5 +270,19 @@
}
}
-
+ public static RecoveryRegistry locateRecoveryRegistry(final String locatorClass)
+ {
+ try
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Class<?> aClass = loader.loadClass(locatorClass);
+ Object o = aClass.newInstance();
+ return (RecoveryRegistry)o;
+ }
+ catch (Throwable e)
+ {
+ log.debug(e.getMessage(), e);
+ return null;
+ }
+ }
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2011-09-26 09:54:29 UTC (rev 11415)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2011-09-26 09:55:05 UTC (rev 11416)
@@ -37,6 +37,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.server.recovery.HornetQResourceRecovery;
import org.hornetq.ra.HornetQResourceAdapter;
import org.hornetq.ra.Util;
@@ -106,7 +107,8 @@
// Whether we are in the failure recovery loop
private AtomicBoolean inFailure = new AtomicBoolean(false);
-
+ private HornetQResourceRecovery resourceRecovery;
+
static
{
try
@@ -314,6 +316,11 @@
{
HornetQActivation.log.debug("Tearing down " + spec);
+ if(resourceRecovery != null)
+ {
+ ra.getRecoveryManager().unRegister(resourceRecovery);
+ }
+
for (HornetQMessageHandler handler : handlers)
{
handler.teardown();
@@ -331,6 +338,7 @@
if (spec.isHasBeenUpdated())
{
factory = ra.createHornetQConnectionFactory(spec);
+ resourceRecovery = ra.getRecoveryManager().register(factory, spec.getUser(), spec.getPassword());
}
else
{
Added: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java (rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/recovery/RecoveryManager.java 2011-09-26 09:55:05 UTC (rev 11416)
@@ -0,0 +1,162 @@
+/*
+* JBoss, Home of Professional Open Source.
+* Copyright 2010, Red Hat, Inc., and individual contributors
+* as indicated by the @author tags. See the copyright.txt file in the
+* distribution for a full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.hornetq.ra.recovery;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.recovery.HornetQResourceRecovery;
+import org.hornetq.jms.server.recovery.RecoveryRegistry;
+import org.hornetq.jms.server.recovery.XARecoveryConfig;
+import org.hornetq.ra.Util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * 9/21/11
+ */
+public class RecoveryManager
+{
+ private static Logger log = Logger.getLogger(RecoveryManager.class);
+
+ private RecoveryRegistry registry;
+
+ private String resourceRecoveryClassNames = "org.jboss.as.integration.hornetq.recovery.AS5RecoveryRegistry";
+
+ private Map<XARecoveryConfig, HornetQResourceRecovery> configMap = new HashMap<XARecoveryConfig, HornetQResourceRecovery>();
+
+ public void start()
+ {
+ locateRecoveryRegistry();
+ }
+
+ public HornetQResourceRecovery register(HornetQConnectionFactory factory, String userName, String password)
+ {
+ if(!isRegistered(factory))
+ {
+ XARecoveryConfig xaRecoveryConfig = new XARecoveryConfig(factory, userName, password);
+ HornetQResourceRecovery resourceRecovery = new HornetQResourceRecovery(xaRecoveryConfig);
+ registry.register(resourceRecovery);
+ configMap.put(xaRecoveryConfig, resourceRecovery);
+ return resourceRecovery;
+ }
+ return null;
+ }
+
+ public void unRegister(HornetQResourceRecovery resourceRecovery)
+ {
+ registry.unRegister(resourceRecovery);
+ }
+
+ public void stop()
+ {
+ for (HornetQResourceRecovery hornetQResourceRecovery : configMap.values())
+ {
+ registry.unRegister(hornetQResourceRecovery);
+ }
+ configMap.clear();
+ }
+
+ private void locateRecoveryRegistry()
+ {
+ String locatorClasses[] = resourceRecoveryClassNames.split(";");
+
+ for (int i = 0 ; i < locatorClasses.length; i++)
+ {
+ registry = Util.locateRecoveryRegistry(locatorClasses[i]);
+ if (registry != null)
+ {
+ break;
+ }
+ }
+
+ if (registry == null)
+ {
+ registry = new RecoveryRegistry()
+ {
+ public void register(HornetQResourceRecovery resourceRecovery)
+ {
+ //no op
+ }
+
+ public void unRegister(HornetQResourceRecovery xaRecoveryConfig)
+ {
+ //no op
+ }
+ };
+ }
+ else
+ {
+ log.debug("Recovery Registry located = " + registry);
+ }
+ }
+
+
+ public boolean isRegistered(HornetQConnectionFactory factory)
+ {
+ for (XARecoveryConfig xaRecoveryConfig : configMap.keySet())
+ {
+ TransportConfiguration[] transportConfigurations = factory.getServerLocator().getStaticTransportConfigurations();
+
+ if (transportConfigurations != null)
+ {
+ TransportConfiguration[] xaConfigurations = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator().getStaticTransportConfigurations();
+ if(xaConfigurations == null)
+ {
+ break;
+ }
+ if(transportConfigurations.length != xaConfigurations.length)
+ {
+ break;
+ }
+ boolean theSame=true;
+ for(int i = 0; i < transportConfigurations.length; i++)
+ {
+ TransportConfiguration tc = transportConfigurations[i];
+ TransportConfiguration xaTc = xaConfigurations[i];
+ if(!tc.equals(xaTc))
+ {
+ theSame = false;
+ break;
+ }
+ }
+ if(theSame)
+ {
+ return theSame;
+ }
+ }
+ else
+ {
+ DiscoveryGroupConfiguration discoveryGroupConfiguration = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator().getDiscoveryGroupConfiguration();
+ if(discoveryGroupConfiguration != null && discoveryGroupConfiguration.equals(factory.getDiscoveryGroupConfiguration()))
+ {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+}
12 years, 9 months
JBoss hornetq SVN: r11415 - in branches/Branch_2_2_AS7: src/main/org/hornetq/jms/server/recovery and 3 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-09-26 05:54:29 -0400 (Mon, 26 Sep 2011)
New Revision: 11415
Added:
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/recovery/
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/recovery/RecoveryManager.java
Removed:
branches/Branch_2_2_AS7/examples/javaee/xarecovery/server/jbossts-properties.xml
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQRAManagedConnection.java
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQResourceAdapter.java
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/Util.java
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/inflow/HornetQActivation.java
Log:
https://issues.jboss.org/browse/HORNETQ-640 implemented XA recovery API
Deleted: branches/Branch_2_2_AS7/examples/javaee/xarecovery/server/jbossts-properties.xml
===================================================================
--- branches/Branch_2_2_AS7/examples/javaee/xarecovery/server/jbossts-properties.xml 2011-09-26 05:32:50 UTC (rev 11414)
+++ branches/Branch_2_2_AS7/examples/javaee/xarecovery/server/jbossts-properties.xml 2011-09-26 09:54:29 UTC (rev 11415)
@@ -1,348 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<transaction-service>
- <properties name="documentation">
- <!--
- This is the JBossTS configuration file. Note that starting with JBossTS 4.6
- the names jbossjta-properties.xml and jbossjts-properties.xml are obsolete.
-
- Both the JTA and JTS versions of JBossTS now read their configuration from
- jbossts-properties.xml, although the contents of the file differs between
- the JTA and JTS. Care should be taken to use the correct version of the file.
-
- ***************************
-
- Property values may be literals or be tokens of the form ${p1[,p2][:v]}
- in which case the token values are substituted for the values of the corresponding system
- properties as follows:
-
- - Any occurance of ${p} with the System.getProperty(p) value.
- If there is no such property p defined, then the ${p} reference will remain unchanged.
-
- - If the property reference is of the form ${p:v} and there is no such property p,
- then the default value v will be returned.
-
- - If the property reference is of the form ${p1,p2} or ${p1,p2:v} then
- the primary and the secondary properties will be tried in turn, before
- returning either the unchanged input, or the default value.
-
- The property ${/} is replaced with System.getProperty("file.separator")
- value and the property ${:} is replaced with System.getProperty("path.separator").
-
- Note this substitution applies to property values only at the point they are read from
- the config file. Tokens in system properties won't be substituted.
- -->
- </properties>
- <properties depends="common" name="arjuna">
- <!--
- Transaction Reaper Timeout (default is 120000 ms).
- -->
- <property
- name="com.arjuna.ats.arjuna.coordinator.txReaperTimeout" value="120000"/>
- <!--
- Transaction Reaper Mode, can be: PERIODIC or DYNAMIC. Default is DYNAMIC.
- -->
- <property name="com.arjuna.ats.arjuna.coordinator.txReaperMode" value="DYNAMIC"/>
- <!--
- Transaction Reaper Cancel Wait Period (default is 500 ms, min is 10 msecs).
- -->
- <property
- name="com.arjuna.ats.arjuna.coordinator.txReaperCancelWaitPeriod" value="500"/>
- <!--
- Transaction Reaper Cancel Fail Wait Period (default is 500 ms, min is 10 msecs).
- -->
- <property
- name="com.arjuna.ats.arjuna.coordinator.txReaperCancelFailWaitPeriod" value="500"/>
- <!--
- Transaction Reaper Zombie Max (default is 8).
- -->
- <property
- name="com.arjuna.ats.arjuna.coordinator.txReaperZombieMax" value="8"/>
- <!--
- (default is NO)
- -->
- <property name="com.arjuna.ats.arjuna.coordinator.asyncCommit" value="NO"/>
- <!--
- (default is NO)
- -->
- <property name="com.arjuna.ats.arjuna.coordinator.asyncPrepare" value="NO"/>
- <!--
- (default is YES)
- -->
- <property
- name="com.arjuna.ats.arjuna.coordinator.commitOnePhase" value="YES"/>
- <!--
- (default is defaultStore)
- -->
- <property name="com.arjuna.ats.arjuna.objectstore.localOSRoot" value="defaultStore"/>
- <!--
- default is under user.home - must be writeable!)
- -->
- <property
- name="com.arjuna.ats.arjuna.objectstore.objectStoreDir" value="PutObjectStoreDirHere"/>
- <!--
- (default is ON)
- -->
- <property
- name="com.arjuna.ats.arjuna.objectstore.objectStoreSync" value="ON"/>
- <!--
- (default is ShadowNoFileLockStore)
- -->
- <property
- name="com.arjuna.ats.arjuna.objectstore.objectStoreType" value="ShadowNoFileLockStore"/>
- <!--
- (default is 255)
- -->
- <property
- name="com.arjuna.ats.arjuna.objectstore.hashedDirectories" value="255"/>
- <!--
- (default is ON)
- -->
- <property
- name="com.arjuna.ats.arjuna.objectstore.transactionSync" value="ON"/>
- <!--
- (Must be unique across all Arjuna instances.)
- -->
- <property name="com.arjuna.ats.arjuna.xa.nodeIdentifier" value="1"/>
- <!--
- Base port number for determining a unique number to associate with an instance of the transaction service
- (which is needed in order to support multiple instances on the same machine).
- Use the value 0 to allow the system to select the first available port number.
- If the port number is non-zero and the port is in use then the value will be incremented until either a successful binding
- to the loopback address is created or until the the maximum number of ports (specified by the
- com.arjuna.ats.internal.arjuna.utils.SocketProcessIdMaxPorts property) have been tried or until the port number
- reaches the maximum possible port number.
- -->
- <property
- name="com.arjuna.ats.internal.arjuna.utils.SocketProcessIdPort" value="0"/>
- <!--
- The maximum number of ports to try starting from the value specified by the property
- com.arjuna.ats.internal.arjuna.utils.SocketProcessIdPort. Any non-numeric or value less than 1 will
- defautl to 1.
- -->
- <property
- name="com.arjuna.ats.internal.arjuna.utils.SocketProcessIdMaxPorts" value="1"/>
- <!--
- Run the TransactionStatusManager to allow out-of-process recovery managers to query
- the status of transactions owned by this coordinator. Default is YES.
- This can be set to NO in cases where an ObjectStore is used only by one transaction manager
- and the recovery manager for that store is in the same JVM. In any other cases disabling the
- TransactionStatusManager may cause crash recovery to misbehave.
- -->
- <property
- name="com.arjuna.ats.arjuna.coordinator.transactionStatusManagerEnable" value="YES"/>
- <!-- property
- name="com.arjuna.ats.arjuna.coordinator.actionStore"
- value="HashedActionStore"
- value="JDBCActionStore"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.objectstore.jdbcTxDbAccess"
- value="JDBCAccess"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.objectstore.objectStoreType"
- value="ShadowNoFileLockStore"
- value="JDBCStore"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.objectstore.jdbcUserDbAccess"
- value="JDBCAccess"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.objectstore.jdbcPoolSizeInitial"
- value="1"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.objectstore.jdbcPoolSizeMaximum"
- value="1"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.objectstore.jdbcPoolPutConnections"
- value="false"
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.internal.arjuna.objectstore.cacheStore.size"
- value=""
- -->
- <!-- property
- name="com.arjuna.ats.arjuna.internal.arjuna.objectstore.cacheStore.period"
- value=""
- -->
- <!--
- The location for creating temporary files, e.g., Uids.
- Default is under user.home.
- IMPORTANT: make sure the directory is lockable, e.g., /tmp on Unix
- may not be!
- -->
- <!--
- <property
- name="com.arjuna.ats.arjuna.common.varDir"
- value="var"/>
- -->
- <!-- Should beforeCompletion synchronizations be fired even when it is known the
- transaction can't commit e.g. is marked rollbackOnly? (default NO).
- Note that turning this on still does not guarantee the syncronizations will run in all cases
- e.g. explicit rollback() calls. JTS users should also take into account the supportRollbackSync
- property which affects both beforeCompletion and afterCompletion syncs. -->
- <!--
- <property name="com.arjuna.ats.coordinator.beforeCompletionWhenRollbackOnly" value="NO"/>
- -->
- </properties>
- <properties name="common">
- <!-- CLF 2.0 properties -->
- <property name="com.arjuna.common.util.logging.DebugLevel"
- type="System" value="0x00000000"/>
- <property name="com.arjuna.common.util.logging.FacilityLevel"
- type="System" value="0xffffffff"/>
- <property name="com.arjuna.common.util.logging.VisibilityLevel"
- type="System" value="0xffffffff"/>
- <property name="com.arjuna.common.util.logger" type="System" value="log4j"/>
- </properties>
- <properties depends="arjuna" name="txoj">
- <!--
- (default is LockStore of installation - must be writeable!)
- -->
- <!--
- <property
- name="com.arjuna.ats.txoj.lockstore.lockStoreDir"
- value="LockStore"/>
- -->
- <!--
- (default is BasicLockStore)
- -->
- <property name="com.arjuna.ats.txoj.lockstore.lockStoreType" value="BasicLockStore"/>
- <!--
- (default is NO)
- -->
- <property name="com.arjuna.ats.txoj.lockstore.multipleLockStore" value="NO"/>
- <!--
- (default is YES)
- -->
- <property name="com.arjuna.ats.txoj.lockstore.singleLockStore" value="YES"/>
- <!--
- (default is YES)
- -->
- <property
- name="com.arjuna.ats.txoj.lockstore.allowNestedLocking" value="YES"/>
- </properties>
- <properties depends="arjuna" name="jta">
- <!--
- Support subtransactions in the JTA layer?
- Default is NO.
- -->
- <property name="com.arjuna.ats.jta.supportSubtransactions" value="NO"/>
- <property name="com.arjuna.ats.jta.jtaTMImplementation" value="com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple"/>
- <!--
- com.arjuna.ats.internal.jta.transaction.jts.TransactionManagerImple
- -->
- <property name="com.arjuna.ats.jta.jtaUTImplementation" value="com.arjuna.ats.internal.jta.transaction.arjunacore.UserTransactionImple"/>
- <!--
- com.arjuna.ats.internal.jta.transaction.jts.UserTransactionImple
- -->
- <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1"
- value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"/>
-
- <!--you'll need something like this if the HornetQ Server is remote-->
- <!--
- <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2"
- value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445"/>-->
-
- <!--you'll need something like this if the HornetQ Server is remote and has failover configured-->
- <!--
- <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2"
- value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost2,port=5446"/>-->
-
-
- <property name="com.arjuna.ats.jta.xaRecoveryNode" value="1"/>
- </properties>
- <properties depends="arjuna,txoj,jta" name="recoverymanager">
- <!--
- Properties used only by the RecoveryManager.
- -->
- <!--
- Periodic recovery settings.
- Time values in this section are in seconds.
- -->
- <!--
- Interval in seconds between initiating the periodic recovery modules.
- Default is 120 seconds.
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.periodicRecoveryPeriod" value="120"/>
- <!--
- Interval in seconds between first and second pass of periodic recovery.
- Default is 10 seconds.
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.recoveryBackoffPeriod" value="10"/>
- <!--
- The port number on which the recovery manager listens.
- -->
- <property name="com.arjuna.ats.arjuna.recovery.recoveryPort" value="4712"/>
- <!--
- The address on which the recovery manager listens.
- If running within an AS then the address the AS is bound to (jboss.bind.address) takes precedence
- -->
- <property name="com.arjuna.ats.arjuna.recovery.recoveryAddress" value=""/>
- <!--
- Periodic recovery modules to use. Invoked in sort-order of names.
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.recoveryExtension1" value="com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule"/>
- <property
- name="com.arjuna.ats.arjuna.recovery.recoveryExtension2" value="com.arjuna.ats.internal.txoj.recovery.TORecoveryModule"/>
- <property
- name="com.arjuna.ats.arjuna.recovery.recoveryExtension3" value="com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule"/>
- <!--
- Expired entry removal
- -->
- <!--
- Expiry scanners to use (order of invocation is random).
- Names must begin with "com.arjuna.ats.arjuna.recovery.expiryScanner"
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.expiryScannerTransactionStatusManager" value="com.arjuna.ats.internal.arjuna.recovery.ExpiredTransactionStatusManagerScanner"/>
- <!--
- Interval, in hours, between running the expiry scanners.
- This can be quite long. The absolute value determines the interval -
- if the value is negative, the scan will NOT be run until after one
- interval has elapsed. If positive the first scan will be immediately
- after startup. Zero will prevent any scanning.
- Default = 12 = run immediately, then every 12 hours.
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.expiryScanInterval" value="12"/>
- <!--
- This is the interval, in hours, after which a process that cannot be contacted will be considered dead.
- It should be long enough to avoid accidentally removing valid entries due to short lived
- transient errors such as network downtime. Zero = Never removed. Default is 12.
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.transactionStatusManagerExpiryTime" value="12"/>
- <!--
- Use this to fix the port on which the TransactionStatusManager listens,
- The default behaviour is to use any free port.
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.transactionStatusManagerPort" value="0"/>
- <!--
- Use this to fix the address on which the TransactionStatusManager binds,
- The default behaviour is to use the loopback address (ie localhost).
- If running within an AS then the address the AS is bound to (jboss.bind.address) takes precedence
- -->
- <property
- name="com.arjuna.ats.arjuna.recovery.transactionStatusManagerAddress" value=""/>
- <!--
- For cases where the recovery manager is in process with the transaction manager and nothing else uses
- the ObjectStore, it is possible to disable the socket based recovery listener by setting this to NO.
- Caution: use of this property can allow multiple recovery processes to run on the same ObjectStore
- if you are not careful. That in turn can lead to incorrect transaction processing. Use with care.
- -->
- <property name="com.arjuna.ats.arjuna.recovery.recoveryListener" value="YES"/>
- </properties>
- <properties depends="jta" name="jdbc">
- <!--
- property name="com.arjuna.ats.jdbc.isolationLevel" value="TRANSACTION_SERIALIZABLE"/>
- -->
- </properties>
-</transaction-service>
Added: branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java (rev 0)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQResourceRecovery.java 2011-09-26 09:54:29 UTC (rev 11415)
@@ -0,0 +1,72 @@
+/*
+* JBoss, Home of Professional Open Source.
+* Copyright 2010, Red Hat, Inc., and individual contributors
+* as indicated by the @author tags. See the copyright.txt file in the
+* distribution for a full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.hornetq.jms.server.recovery;
+
+import org.jboss.tm.XAResourceRecovery;
+
+import javax.transaction.xa.XAResource;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * 9/20/11
+ */
+public class HornetQResourceRecovery implements XAResourceRecovery
+{
+ private final XARecoveryConfig config;
+
+ private final HornetQXAResourceWrapper resourceWrapper;
+
+ public HornetQResourceRecovery(XARecoveryConfig config)
+ {
+ this.config = config;
+ resourceWrapper = new HornetQXAResourceWrapper(config);
+ }
+
+ public XAResource[] getXAResources()
+ {
+ return new XAResource[]{resourceWrapper};
+ }
+
+ public void close()
+ {
+ resourceWrapper.close();
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ HornetQResourceRecovery that = (HornetQResourceRecovery) o;
+
+ if (config != null ? !config.equals(that.config) : that.config != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return config != null ? config.hashCode() : 0;
+ }
+}
Deleted: branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java 2011-09-26 05:32:50 UTC (rev 11414)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java 2011-09-26 09:54:29 UTC (rev 11415)
@@ -1,224 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.jms.server.recovery;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.transaction.xa.XAResource;
-
-import com.arjuna.ats.jta.recovery.XAResourceRecovery;
-
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.logging.Logger;
-
-/**
- *
- * A XAResourceRecovery instance that can be used to recover any JMS provider.
- *
- * In reality only recover,rollback and commit will be called but we still need to
- * be implement all methods just in case.
- *
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class HornetQXAResourceRecovery implements XAResourceRecovery
-{
- private final boolean trace = HornetQXAResourceRecovery.log.isTraceEnabled();
-
- private static final Logger log = Logger.getLogger(HornetQXAResourceRecovery.class);
-
- private boolean hasMore;
-
- private HornetQXAResourceWrapper res;
-
- public HornetQXAResourceRecovery()
- {
- if (trace)
- {
- HornetQXAResourceRecovery.log.trace("Constructing HornetQXAResourceRecovery");
- }
- }
-
- public boolean initialise(final String config)
- {
- if (HornetQXAResourceRecovery.log.isTraceEnabled())
- {
- HornetQXAResourceRecovery.log.trace(this + " intialise: " + config);
- }
-
- String[] configs = config.split(";");
- XARecoveryConfig[] xaRecoveryConfigs = new XARecoveryConfig[configs.length];
- for (int i = 0, configsLength = configs.length; i < configsLength; i++)
- {
- String s = configs[i];
- ConfigParser parser = new ConfigParser(s);
- String connectorFactoryClassName = parser.getConnectorFactoryClassName();
- Map<String, Object> connectorParams = parser.getConnectorParameters();
- String username = parser.getUsername();
- String password = parser.getPassword();
- TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams);
- xaRecoveryConfigs[i] = new XARecoveryConfig(transportConfiguration, username, password);
- }
-
-
-
- res = new HornetQXAResourceWrapper(xaRecoveryConfigs);
-
- if (HornetQXAResourceRecovery.log.isTraceEnabled())
- {
- HornetQXAResourceRecovery.log.trace(this + " initialised");
- }
-
- return true;
- }
-
- public boolean hasMoreResources()
- {
- if (HornetQXAResourceRecovery.log.isTraceEnabled())
- {
- HornetQXAResourceRecovery.log.trace(this + " hasMoreResources");
- }
-
- /*
- * The way hasMoreResources is supposed to work is as follows:
- * For each "sweep" the recovery manager will call hasMoreResources, then if it returns
- * true it will call getXAResource.
- * It will repeat that until hasMoreResources returns false.
- * Then the sweep is over.
- * For the next sweep hasMoreResources should return true, etc.
- *
- * In our case where we only need to return one XAResource per sweep,
- * hasMoreResources should basically alternate between true and false.
- *
- *
- */
-
- hasMore = !hasMore;
-
- return hasMore;
- }
-
- public XAResource getXAResource()
- {
- if (HornetQXAResourceRecovery.log.isTraceEnabled())
- {
- HornetQXAResourceRecovery.log.trace(this + " getXAResource");
- }
-
- return res;
- }
-
- public XAResource[] getXAResources()
- {
- return new XAResource[] { res };
- }
-
- @Override
- protected void finalize()
- {
- res.close();
- }
-
- public static class ConfigParser
- {
- private final String connectorFactoryClassName;
-
- private final Map<String, Object> connectorParameters;
-
- private String username;
-
- private String password;
-
- public ConfigParser(final String config)
- {
- if (config == null || config.length() == 0)
- {
- throw new IllegalArgumentException("Must specify provider connector factory class name in config");
- }
-
- String[] strings = config.split(",");
-
- // First (mandatory) param is the connector factory class name
- if (strings.length < 1)
- {
- throw new IllegalArgumentException("Must specify provider connector factory class name in config");
- }
-
- connectorFactoryClassName = strings[0].trim();
-
- // Next two (optional) parameters are the username and password to use for creating the session for recovery
-
- if (strings.length >= 2)
- {
-
- username = strings[1].trim();
- if (username.length() == 0)
- {
- username = null;
- }
-
- if (strings.length == 2)
- {
- throw new IllegalArgumentException("If username is specified, password must be specified too");
- }
-
- password = strings[2].trim();
- if (password.length() == 0)
- {
- password = null;
- }
- }
-
- // other tokens are for connector configurations
- connectorParameters = new HashMap<String, Object>();
- if (strings.length >= 3)
- {
- for (int i = 3; i < strings.length; i++)
- {
- String[] str = strings[i].split("=");
- if (str.length == 2)
- {
- connectorParameters.put(str[0].trim(), str[1].trim());
- }
- }
- }
- }
-
- public String getConnectorFactoryClassName()
- {
- return connectorFactoryClassName;
- }
-
- public Map<String, Object> getConnectorParameters()
- {
- return connectorParameters;
- }
-
- public String getUsername()
- {
- return username;
- }
-
- public String getPassword()
- {
- return password;
- }
- }
-}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2011-09-26 05:32:50 UTC (rev 11414)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2011-09-26 09:54:29 UTC (rev 11415)
@@ -60,7 +60,7 @@
private XARecoveryConfig[] xaRecoveryConfigs;
- private TransportConfiguration currentConnection;
+ //private TransportConfiguration currentConnection;
public HornetQXAResourceWrapper(XARecoveryConfig... xaRecoveryConfigs)
{
@@ -71,7 +71,7 @@
public Xid[] recover(final int flag) throws XAException
{
XAResource xaResource = getDelegate(false);
- HornetQXAResourceWrapper.log.debug("Recover " + currentConnection);
+ HornetQXAResourceWrapper.log.debug("Recover " + xaResource);
try
{
return xaResource.recover(flag);
@@ -85,7 +85,7 @@
public void commit(final Xid xid, final boolean onePhase) throws XAException
{
XAResource xaResource = getDelegate(true);
- HornetQXAResourceWrapper.log.debug("Commit " + currentConnection + " xid " + " onePhase=" + onePhase);
+ HornetQXAResourceWrapper.log.debug("Commit " + xaResource + " xid " + " onePhase=" + onePhase);
try
{
xaResource.commit(xid, onePhase);
@@ -99,7 +99,7 @@
public void rollback(final Xid xid) throws XAException
{
XAResource xaResource = getDelegate(true);
- HornetQXAResourceWrapper.log.debug("Rollback " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("Rollback " + xaResource + " xid ");
try
{
xaResource.rollback(xid);
@@ -113,7 +113,7 @@
public void forget(final Xid xid) throws XAException
{
XAResource xaResource = getDelegate(false);
- HornetQXAResourceWrapper.log.debug("Forget " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("Forget " + xaResource + " xid ");
try
{
xaResource.forget(xid);
@@ -145,7 +145,7 @@
public int prepare(final Xid xid) throws XAException
{
XAResource xaResource = getDelegate(true);
- HornetQXAResourceWrapper.log.debug("prepare " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("prepare " + xaResource + " xid ");
try
{
return xaResource.prepare(xid);
@@ -159,7 +159,7 @@
public void start(final Xid xid, final int flags) throws XAException
{
XAResource xaResource = getDelegate(false);
- HornetQXAResourceWrapper.log.debug("start " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("start " + xaResource + " xid ");
try
{
xaResource.start(xid, flags);
@@ -173,7 +173,7 @@
public void end(final Xid xid, final int flags) throws XAException
{
XAResource xaResource = getDelegate(false);
- HornetQXAResourceWrapper.log.debug("end " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("end " + xaResource + " xid ");
try
{
xaResource.end(xid, flags);
@@ -187,7 +187,7 @@
public int getTransactionTimeout() throws XAException
{
XAResource xaResource = getDelegate(false);
- HornetQXAResourceWrapper.log.debug("getTransactionTimeout " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("getTransactionTimeout " + xaResource + " xid ");
try
{
return xaResource.getTransactionTimeout();
@@ -201,7 +201,7 @@
public boolean setTransactionTimeout(final int seconds) throws XAException
{
XAResource xaResource = getDelegate(false);
- HornetQXAResourceWrapper.log.debug("setTransactionTimeout " + currentConnection + " xid ");
+ HornetQXAResourceWrapper.log.debug("setTransactionTimeout " + xaResource + " xid ");
try
{
return xaResource.setTransactionTimeout(seconds);
@@ -214,7 +214,7 @@
public void connectionFailed(final HornetQException me, boolean failedOver)
{
- HornetQXAResourceWrapper.log.warn("Notified of connection failure in xa recovery connectionFactory for provider " + currentConnection + " will attempt reconnect on next pass",
+ HornetQXAResourceWrapper.log.warn("Notified of connection failure in xa recovery connectionFactory for provider " + csf + " will attempt reconnect on next pass",
me);
close();
}
@@ -299,7 +299,7 @@
try
{
- serverLocator = HornetQClient.createServerLocatorWithoutHA(xaRecoveryConfig.getTransportConfiguration());
+ serverLocator = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator();
serverLocator.disableFinalizeCheck();
csf = serverLocator.createSessionFactory();
if (xaRecoveryConfig.getUsername() == null)
@@ -320,12 +320,10 @@
synchronized (HornetQXAResourceWrapper.lock)
{
delegate = cs;
- currentConnection = xaRecoveryConfig.getTransportConfiguration();
}
return delegate;
}
- currentConnection = null;
throw new HornetQException(HornetQException.NOT_CONNECTED);
}
Added: branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java (rev 0)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/RecoveryRegistry.java 2011-09-26 09:54:29 UTC (rev 11415)
@@ -0,0 +1,36 @@
+/*
+* JBoss, Home of Professional Open Source.
+* Copyright 2010, Red Hat, Inc., and individual contributors
+* as indicated by the @author tags. See the copyright.txt file in the
+* distribution for a full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.hornetq.jms.server.recovery;
+
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.ra.HornetQRAConnectionFactory;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * 9/20/11
+ */
+public interface RecoveryRegistry
+{
+ void register(HornetQResourceRecovery resourceRecovery);
+
+ void unRegister(HornetQResourceRecovery resourceRecovery);
+}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java 2011-09-26 05:32:50 UTC (rev 11414)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java 2011-09-26 09:54:29 UTC (rev 11415)
@@ -13,7 +13,7 @@
package org.hornetq.jms.server.recovery;
-import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.jms.client.HornetQConnectionFactory;
/**
* @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
@@ -24,20 +24,20 @@
*/
public class XARecoveryConfig
{
- private final TransportConfiguration transportConfiguration;
+ private final HornetQConnectionFactory hornetQConnectionFactory;
private final String username;
private final String password;
- public XARecoveryConfig(TransportConfiguration transportConfiguration, String username, String password)
+ public XARecoveryConfig(HornetQConnectionFactory hornetQConnectionFactory, String username, String password)
{
- this.transportConfiguration = transportConfiguration;
+ this.hornetQConnectionFactory = hornetQConnectionFactory;
this.username = username;
this.password = password;
}
- public TransportConfiguration getTransportConfiguration()
+ public HornetQConnectionFactory getHornetQConnectionFactory()
{
- return transportConfiguration;
+ return hornetQConnectionFactory;
}
public String getUsername()
@@ -49,4 +49,29 @@
{
return password;
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ XARecoveryConfig that = (XARecoveryConfig) o;
+
+ if (hornetQConnectionFactory != null ? !hornetQConnectionFactory.equals(that.hornetQConnectionFactory) : that.hornetQConnectionFactory != null)
+ return false;
+ if (password != null ? !password.equals(that.password) : that.password != null) return false;
+ if (username != null ? !username.equals(that.username) : that.username != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = hornetQConnectionFactory != null ? hornetQConnectionFactory.hashCode() : 0;
+ result = 31 * result + (username != null ? username.hashCode() : 0);
+ result = 31 * result + (password != null ? password.hashCode() : 0);
+ return result;
+ }
}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQRAManagedConnection.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQRAManagedConnection.java 2011-09-26 05:32:50 UTC (rev 11414)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQRAManagedConnection.java 2011-09-26 09:54:29 UTC (rev 11415)
@@ -249,6 +249,8 @@
HornetQRAManagedConnection.log.debug("Error unsetting the exception listener " + this, e);
}
+ mcf.stop();
+
destroyHandles();
try
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java 2011-09-26 05:32:50 UTC (rev 11414)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java 2011-09-26 09:54:29 UTC (rev 11415)
@@ -30,6 +30,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.recovery.HornetQResourceRecovery;
/**
* HornetQ ManagedConectionFactory
@@ -76,6 +77,11 @@
*/
private HornetQConnectionFactory connectionFactory;
+ /*
+ * The resource recovery if there is one
+ * */
+ private HornetQResourceRecovery resourceRecovery;
+
/**
* Constructor
*/
@@ -747,6 +753,7 @@
if (connectionFactory == null)
{
connectionFactory = ra.createHornetQConnectionFactory(mcfProperties);
+ resourceRecovery = ra.getRecoveryManager().register(connectionFactory, null, null);
}
return connectionFactory;
}
@@ -791,4 +798,12 @@
return info;
}
}
+
+ public void stop()
+ {
+ if(resourceRecovery != null)
+ {
+ ra.getRecoveryManager().unRegister(resourceRecovery);
+ }
+ }
}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-09-26 05:32:50 UTC (rev 11414)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-09-26 09:54:29 UTC (rev 11415)
@@ -41,8 +41,11 @@
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.recovery.RecoveryRegistry;
+import org.hornetq.jms.server.recovery.XARecoveryConfig;
import org.hornetq.ra.inflow.HornetQActivation;
import org.hornetq.ra.inflow.HornetQActivationSpec;
+import org.hornetq.ra.recovery.RecoveryManager;
/**
* The resource adapter for HornetQ
@@ -106,6 +109,8 @@
private String unparsedJndiParams;
+ RecoveryManager recoveryManager;
+
/**
* Constructor
*/
@@ -119,6 +124,7 @@
raProperties = new HornetQRAProperties();
configured = new AtomicBoolean(false);
activations = new ConcurrentHashMap<ActivationSpec, HornetQActivation>();
+ recoveryManager = new RecoveryManager();
}
public TransactionManager getTM()
@@ -209,6 +215,8 @@
locateTM();
+ recoveryManager.start();
+
this.ctx = ctx;
HornetQResourceAdapter.log.info("HornetQ resource adaptor started");
@@ -241,8 +249,12 @@
if (defaultHornetQConnectionFactory != null)
{
defaultHornetQConnectionFactory.close();
+
+ XARecoveryConfig xaRecoveryConfig = new XARecoveryConfig(defaultHornetQConnectionFactory, raProperties.getUserName(), raProperties.getPassword());
}
+ recoveryManager.stop();
+
HornetQResourceAdapter.log.info("HornetQ resource adapter stopped");
}
@@ -1381,6 +1393,13 @@
}
+
+
+ public RecoveryManager getRecoveryManager()
+ {
+ return recoveryManager;
+ }
+
/**
* Get the resource adapter properties
*
@@ -1402,6 +1421,7 @@
protected void setup() throws HornetQException
{
defaultHornetQConnectionFactory = createHornetQConnectionFactory(raProperties);
+ recoveryManager.register(defaultHornetQConnectionFactory, raProperties.getUserName(), raProperties.getPassword());
}
public Map<ActivationSpec, HornetQActivation> getActivations()
@@ -1439,7 +1459,7 @@
{
ha = HornetQClient.DEFAULT_IS_HA;
}
-
+
if (connectorClassName != null)
{
TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()];
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/ra/Util.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/ra/Util.java 2011-09-26 05:32:50 UTC (rev 11414)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/ra/Util.java 2011-09-26 09:54:29 UTC (rev 11415)
@@ -19,7 +19,7 @@
import javax.transaction.TransactionManager;
import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.ClassloadingUtil;
+import org.hornetq.jms.server.recovery.RecoveryRegistry;
/**
* Various utility functions
@@ -257,7 +257,7 @@
{
try
{
- ClassLoader loader = HornetQResourceAdapter.class.getClassLoader();
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
Class<?> aClass = loader.loadClass(locatorClass);
Object o = aClass.newInstance();
Method m = aClass.getMethod(locatorMethod);
@@ -266,14 +266,18 @@
catch (Throwable e)
{
log.debug(e.getMessage(), e);
+ return null;
}
+ }
+
+ public static RecoveryRegistry locateRecoveryRegistry(final String locatorClass)
+ {
try
{
ClassLoader loader = Thread.currentThread().getContextClassLoader();
Class<?> aClass = loader.loadClass(locatorClass);
Object o = aClass.newInstance();
- Method m = aClass.getMethod(locatorMethod);
- return (TransactionManager)m.invoke(o);
+ return (RecoveryRegistry)o;
}
catch (Throwable e)
{
@@ -281,6 +285,4 @@
return null;
}
}
-
-
}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2011-09-26 05:32:50 UTC (rev 11414)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2011-09-26 09:54:29 UTC (rev 11415)
@@ -37,6 +37,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.server.recovery.HornetQResourceRecovery;
import org.hornetq.ra.HornetQResourceAdapter;
import org.hornetq.ra.Util;
@@ -106,7 +107,10 @@
// Whether we are in the failure recovery loop
private AtomicBoolean inFailure = new AtomicBoolean(false);
-
+
+ /** Used to unregister recovery once this endpoint has gone*/
+ private HornetQResourceRecovery resourceRecovery;
+
static
{
try
@@ -314,6 +318,11 @@
{
HornetQActivation.log.debug("Tearing down " + spec);
+ if(resourceRecovery != null)
+ {
+ ra.getRecoveryManager().unRegister(resourceRecovery);
+ }
+
for (HornetQMessageHandler handler : handlers)
{
handler.teardown();
@@ -331,6 +340,7 @@
if (spec.isHasBeenUpdated())
{
factory = ra.createHornetQConnectionFactory(spec);
+ resourceRecovery = ra.getRecoveryManager().register(factory, spec.getUser(), spec.getPassword());
}
else
{
Added: branches/Branch_2_2_AS7/src/main/org/hornetq/ra/recovery/RecoveryManager.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/ra/recovery/RecoveryManager.java (rev 0)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/ra/recovery/RecoveryManager.java 2011-09-26 09:54:29 UTC (rev 11415)
@@ -0,0 +1,164 @@
+/*
+* JBoss, Home of Professional Open Source.
+* Copyright 2010, Red Hat, Inc., and individual contributors
+* as indicated by the @author tags. See the copyright.txt file in the
+* distribution for a full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.hornetq.ra.recovery;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.recovery.HornetQResourceRecovery;
+import org.hornetq.jms.server.recovery.RecoveryRegistry;
+import org.hornetq.jms.server.recovery.XARecoveryConfig;
+import org.hornetq.ra.Util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * 9/21/11
+ */
+public class RecoveryManager
+{
+ private static Logger log = Logger.getLogger(RecoveryManager.class);
+
+ private RecoveryRegistry registry;
+
+ private String resourceRecoveryClassNames = "org.jboss.as.integration.hornetq.recovery.AS5RecoveryRegistry;org.jboss.as.messaging.jms.AS7RecoveryRegistry";
+
+ private Map<XARecoveryConfig, HornetQResourceRecovery> configMap = new HashMap<XARecoveryConfig, HornetQResourceRecovery>();
+
+ public void start()
+ {
+ locateRecoveryRegistry();
+ }
+
+ public HornetQResourceRecovery register(HornetQConnectionFactory factory, String userName, String password)
+ {
+ if(!isRegistered(factory))
+ {
+ XARecoveryConfig xaRecoveryConfig = new XARecoveryConfig(factory, userName, password);
+ HornetQResourceRecovery resourceRecovery = new HornetQResourceRecovery(xaRecoveryConfig);
+ registry.register(resourceRecovery);
+ configMap.put(xaRecoveryConfig, resourceRecovery);
+ return resourceRecovery;
+ }
+ return null;
+ }
+
+ public void unRegister(HornetQResourceRecovery resourceRecovery)
+ {
+ registry.unRegister(resourceRecovery);
+ resourceRecovery.close();
+ }
+
+ public void stop()
+ {
+ for (HornetQResourceRecovery hornetQResourceRecovery : configMap.values())
+ {
+ registry.unRegister(hornetQResourceRecovery);
+ hornetQResourceRecovery.close();
+ }
+ configMap.clear();
+ }
+
+ private void locateRecoveryRegistry()
+ {
+ String locatorClasses[] = resourceRecoveryClassNames.split(";");
+
+ for (int i = 0 ; i < locatorClasses.length; i++)
+ {
+ registry = Util.locateRecoveryRegistry(locatorClasses[i]);
+ if (registry != null)
+ {
+ break;
+ }
+ }
+
+ if (registry == null)
+ {
+ registry = new RecoveryRegistry()
+ {
+ public void register(HornetQResourceRecovery resourceRecovery)
+ {
+ //no op
+ }
+
+ public void unRegister(HornetQResourceRecovery xaRecoveryConfig)
+ {
+ //no op
+ }
+ };
+ }
+ else
+ {
+ log.debug("Recovery Registry located = " + registry);
+ }
+ }
+
+
+ public boolean isRegistered(HornetQConnectionFactory factory)
+ {
+ for (XARecoveryConfig xaRecoveryConfig : configMap.keySet())
+ {
+ TransportConfiguration[] transportConfigurations = factory.getServerLocator().getStaticTransportConfigurations();
+
+ if (transportConfigurations != null)
+ {
+ TransportConfiguration[] xaConfigurations = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator().getStaticTransportConfigurations();
+ if(xaConfigurations == null)
+ {
+ break;
+ }
+ if(transportConfigurations.length != xaConfigurations.length)
+ {
+ break;
+ }
+ boolean theSame=true;
+ for(int i = 0; i < transportConfigurations.length; i++)
+ {
+ TransportConfiguration tc = transportConfigurations[i];
+ TransportConfiguration xaTc = xaConfigurations[i];
+ if(!tc.equals(xaTc))
+ {
+ theSame = false;
+ break;
+ }
+ }
+ if(theSame)
+ {
+ return theSame;
+ }
+ }
+ else
+ {
+ DiscoveryGroupConfiguration discoveryGroupConfiguration = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator().getDiscoveryGroupConfiguration();
+ if(discoveryGroupConfiguration != null && discoveryGroupConfiguration.equals(factory.getDiscoveryGroupConfiguration()))
+ {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+}
12 years, 9 months
JBoss hornetq SVN: r11414 - in branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp: v11 and 1 other directory.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-26 01:32:50 -0400 (Mon, 26 Sep 2011)
New Revision: 11414
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV10.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
test
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-09-26 05:11:45 UTC (rev 11413)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-09-26 05:32:50 UTC (rev 11414)
@@ -82,8 +82,7 @@
public ClientStompFrame createFrame(
String command)
{
- // TODO Auto-generated method stub
- return null;
+ return new ClientStompFrameV10(command);
}
@Override
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV10.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV10.java 2011-09-26 05:11:45 UTC (rev 11413)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV10.java 2011-09-26 05:32:50 UTC (rev 11414)
@@ -39,8 +39,6 @@
public ClientStompFrame createFrame(String data)
{
- System.out.println("Raw data is: " + data + "|");
-
//split the string at "\n\n"
String[] dataFields = data.split("\n\n");
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-26 05:11:45 UTC (rev 11413)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-26 05:32:50 UTC (rev 11414)
@@ -978,8 +978,33 @@
connV11.disconnect();
newConn.disconnect();
}
+
+ public void testSendAndReceiveOnDifferentConnections() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame sendFrame = connV11.createFrame("SEND");
+ sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ sendFrame.setBody("Hello World");
- //tests below are adapted from StompTest
+ connV11.sendFrame(sendFrame);
+
+ StompClientConnection connV11_2 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ connV11_2.connect(defUser, defPass);
+
+ this.subscribe(connV11_2, "sub1", "auto");
+
+ ClientStompFrame frame = connV11_2.receiveFrame(2000);
+
+ assertEquals("MESSAGE", frame.getCommand());
+ assertEquals("Hello World", frame.getBody());
+
+ connV11.disconnect();
+ connV11_2.disconnect();
+ }
+
+ //----------------Note: tests below are adapted from StompTest
+
public void testBeginSameTransactionTwice() throws Exception
{
connV11.connect(defUser, defPass);
12 years, 9 months
JBoss hornetq SVN: r11413 - branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-26 01:11:45 -0400 (Mon, 26 Sep 2011)
New Revision: 11413
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
Log:
comment
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-25 13:40:24 UTC (rev 11412)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-26 05:11:45 UTC (rev 11413)
@@ -18,11 +18,13 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.Executor;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
@@ -74,7 +76,7 @@
{
StompConnection conn = new StompConnection(connection, this);
- // Note that STOMP has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection
+ // Note that STOMP 1.0 has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection
// will be timed out and closed!
long ttl = server.getConfiguration().getConnectionTTLOverride();
12 years, 9 months
JBoss hornetq SVN: r11412 - branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-25 09:40:24 -0400 (Sun, 25 Sep 2011)
New Revision: 11412
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
Log:
fix compilation error
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-25 08:18:23 UTC (rev 11411)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-25 13:40:24 UTC (rev 11412)
@@ -166,6 +166,11 @@
init();
}
+ public StompDecoder()
+ {
+ init();
+ }
+
public boolean hasBytes()
{
return data > pos;
@@ -184,7 +189,7 @@
*/
public synchronized StompFrame decode(final HornetQBuffer buffer) throws Exception
{
- if (connection.isValid())
+ if (connection != null && connection.isValid())
{
VersionedStompFrameHandler handler = connection.getFrameHandler();
return handler.decode(this, buffer);
12 years, 9 months
JBoss hornetq SVN: r11411 - branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-25 04:18:23 -0400 (Sun, 25 Sep 2011)
New Revision: 11411
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
Log:
deadlock found in perf test
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-25 07:39:38 UTC (rev 11410)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-25 08:18:23 UTC (rev 11411)
@@ -80,6 +80,8 @@
private boolean initialized;
private FrameEventListener stompListener;
+
+ private final Object sendLock = new Object();
public StompDecoder getDecoder()
{
@@ -202,7 +204,10 @@
internalClose();
- callClosingListeners();
+ synchronized(sendLock)
+ {
+ callClosingListeners();
+ }
}
private void internalClose()
@@ -698,7 +703,11 @@
public void physicalSend(StompFrame frame) throws Exception
{
HornetQBuffer buffer = frame.toHornetQBuffer();
- getTransportConnection().write(buffer, false, false);
+
+ synchronized (sendLock)
+ {
+ getTransportConnection().write(buffer, false, false);
+ }
if (stompListener != null)
{
12 years, 9 months