JBoss hornetq SVN: r8007 - in trunk: tests/src/org/hornetq/tests/integration/jms/client and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-09-29 09:34:52 -0400 (Tue, 29 Sep 2009)
New Revision: 8007
Added:
trunk/tests/src/org/hornetq/tests/integration/jms/client/MessageTest.java
Modified:
trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
Log:
added message test
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2009-09-29 01:14:33 UTC (rev 8006)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2009-09-29 13:34:52 UTC (rev 8007)
@@ -11,7 +11,6 @@
* permissions and limitations under the License.
*/
-
package org.hornetq.jms.client;
import java.io.InputStream;
@@ -246,7 +245,7 @@
}
- public HornetQMessage(byte type)
+ public HornetQMessage(final byte type)
{
message = new ClientMessageImpl(type,
true,
@@ -274,13 +273,13 @@
/**
* Constructor for when receiving a message from the server
*/
- public HornetQMessage(final ClientMessage message, ClientSession session)
+ public HornetQMessage(final ClientMessage message, final ClientSession session)
{
this.message = message;
- this.readOnly = true;
+ readOnly = true;
- this.propertiesReadOnly = true;
+ propertiesReadOnly = true;
this.session = session;
}
@@ -324,11 +323,11 @@
// We can't avoid a cast warning here since getPropertyNames() is on the JMS API
for (Enumeration<String> props = foreign.getPropertyNames(); props.hasMoreElements();)
{
- String name = (String)props.nextElement();
+ String name = props.nextElement();
Object prop = foreign.getObjectProperty(name);
- this.setObjectProperty(name, prop);
+ setObjectProperty(name, prop);
}
}
@@ -477,7 +476,7 @@
public void setJMSDestination(final Destination destination) throws JMSException
{
- this.dest = destination;
+ dest = destination;
}
public int getJMSDeliveryMode() throws JMSException
@@ -599,51 +598,78 @@
public boolean propertyExists(final String name) throws JMSException
{
return message.containsProperty(new SimpleString(name)) || name.equals(JMSXDELIVERYCOUNT) ||
- (JMSXGROUPID.equals(name) && message.containsProperty(MessageImpl.HDR_GROUP_ID));
+ JMSXGROUPID.equals(name) &&
+ message.containsProperty(MessageImpl.HDR_GROUP_ID);
}
public boolean getBooleanProperty(final String name) throws JMSException
{
Object value = message.getProperty(new SimpleString(name));
if (value == null)
+ {
return Boolean.valueOf(null).booleanValue();
+ }
if (value instanceof Boolean)
+ {
return ((Boolean)value).booleanValue();
+ }
else if (value instanceof SimpleString)
+ {
return Boolean.valueOf(((SimpleString)value).toString()).booleanValue();
+ }
else
+ {
throw new MessageFormatException("Invalid conversion");
+ }
}
public byte getByteProperty(final String name) throws JMSException
{
Object value = message.getProperty(new SimpleString(name));
if (value == null)
+ {
throw new NumberFormatException("Message property '" + name + "' not set.");
+ }
if (value instanceof Byte)
+ {
return ((Byte)value).byteValue();
+ }
else if (value instanceof SimpleString)
+ {
return Byte.parseByte(((SimpleString)value).toString());
+ }
else
+ {
throw new MessageFormatException("Invalid conversion");
+ }
}
public short getShortProperty(final String name) throws JMSException
{
Object value = message.getProperty(new SimpleString(name));
if (value == null)
+ {
throw new NumberFormatException("Message property '" + name + "' not set.");
+ }
if (value instanceof Byte)
+ {
return ((Byte)value).shortValue();
+ }
else if (value instanceof Short)
+ {
return ((Short)value).shortValue();
+ }
else if (value instanceof SimpleString)
+ {
return Short.parseShort(((SimpleString)value).toString());
+ }
else
+ {
throw new MessageFormatException("Invalid conversion");
+ }
}
public int getIntProperty(final String name) throws JMSException
@@ -657,7 +683,7 @@
if (value == null)
{
- throw new NumberFormatException("Message property '" + name + "' not set.");
+ return Integer.valueOf(null);
}
if (value instanceof Byte)
@@ -693,7 +719,7 @@
if (value == null)
{
- throw new NumberFormatException("Message property '" + name + "' not set.");
+ return Long.valueOf(null);
}
if (value instanceof Byte)
@@ -726,30 +752,48 @@
{
Object value = message.getProperty(new SimpleString(name));
if (value == null)
+ {
return Float.valueOf(null).floatValue();
+ }
if (value instanceof Float)
+ {
return ((Float)value).floatValue();
+ }
else if (value instanceof SimpleString)
+ {
return Float.parseFloat(((SimpleString)value).toString());
+ }
else
+ {
throw new MessageFormatException("Invalid conversion");
+ }
}
public double getDoubleProperty(final String name) throws JMSException
{
Object value = message.getProperty(new SimpleString(name));
if (value == null)
+ {
return Double.valueOf(null).doubleValue();
+ }
if (value instanceof Float)
+ {
return ((Float)value).doubleValue();
+ }
else if (value instanceof Double)
+ {
return ((Double)value).doubleValue();
+ }
else if (value instanceof SimpleString)
+ {
return Double.parseDouble(((SimpleString)value).toString());
+ }
else
+ {
throw new MessageFormatException("Invalid conversion");
+ }
}
public String getStringProperty(final String name) throws JMSException
@@ -768,7 +812,9 @@
value = message.getProperty(new SimpleString(name));
}
if (value == null)
+ {
return null;
+ }
if (value instanceof SimpleString)
{
@@ -896,6 +942,13 @@
public void setStringProperty(final String name, final String value) throws JMSException
{
checkProperty(name, value);
+
+ if (value == null)
+ {
+ // This is ok - when we try to read the same key it will return null too
+ return;
+ }
+
if (JMSXGROUPID.equals(name))
{
message.putStringProperty(MessageImpl.HDR_GROUP_ID, new SimpleString(value));
@@ -923,6 +976,12 @@
checkProperty(name, value);
+ if (value == null)
+ {
+ // This is ok - when we try to read the same key it will return null too
+ return;
+ }
+
if (JMS_HORNETQ_INPUT_STREAM.equals(name))
{
setInputStream((InputStream)value);
@@ -1058,7 +1117,7 @@
}
}
- public boolean waitCompletionOnStream(long timeWait) throws JMSException
+ public boolean waitCompletionOnStream(final long timeWait) throws JMSException
{
checkStream();
try
@@ -1071,6 +1130,7 @@
}
}
+ @Override
public String toString()
{
StringBuffer sb = new StringBuffer("HornetQMessage[");
@@ -1123,7 +1183,8 @@
{
throw new MessageNotWriteableException("You cannot set the Input Stream on received messages. Did you mean " + JMS_HORNETQ_OUTPUT_STREAM +
" or " +
- JMS_HORNETQ_SAVE_STREAM + "?");
+ JMS_HORNETQ_SAVE_STREAM +
+ "?");
}
else
{
@@ -1198,7 +1259,7 @@
return true;
}
- private void checkPriority(int priority) throws JMSException
+ private void checkPriority(final int priority) throws JMSException
{
if (priority < 0 || priority > 9)
{
Added: trunk/tests/src/org/hornetq/tests/integration/jms/client/MessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/MessageTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/MessageTest.java 2009-09-29 13:34:52 UTC (rev 8007)
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.jms.client;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ *
+ * A MessageTest
+ *
+ * @author tim
+ *
+ *
+ */
+public class MessageTest extends JMSTestBase
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(MessageTest.class);
+
+ private static final long TIMEOUT = 1000;
+
+ private static final String propName1 = "myprop1";
+ private static final String propName2 = "myprop2";
+ private static final String propName3 = "myprop3";
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testNullProperties() throws Exception
+ {
+ Connection conn = cf.createConnection();
+
+ Queue queue = createQueue("testQueue");
+
+ try
+ {
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue);
+
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ conn.start();
+
+ Message msg = sess.createMessage();
+
+ msg.setObjectProperty(propName1, null);
+ msg.setStringProperty(propName2, null);
+
+ checkProperties(msg);
+
+ Message received = sendAndConsumeMessage(msg, prod, cons);
+
+ assertNotNull(received);
+
+ checkProperties(received);
+ }
+ finally
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Throwable igonred)
+ {
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private void checkProperties(final Message message) throws Exception
+ {
+ assertNull(message.getObjectProperty(propName1));
+ assertNull(message.getStringProperty(propName1));
+ assertNull(message.getStringProperty(propName2));
+ assertNull(message.getObjectProperty(propName2));
+ assertNull(message.getStringProperty(propName3));
+ assertNull(message.getObjectProperty(propName3));
+
+ try
+ {
+ log.info(message.getIntProperty(propName1));
+ fail("Should throw exception");
+ }
+ catch (NumberFormatException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ log.info(message.getShortProperty(propName1));
+ }
+ catch (NumberFormatException e)
+ {
+ //Ok
+ }
+ try
+ {
+ log.info(message.getByteProperty(propName1));
+ }
+ catch (NumberFormatException e)
+ {
+ //Ok
+ }
+ assertEquals(false, message.getBooleanProperty(propName1));
+ try
+ {
+ log.info(message.getLongProperty(propName1));
+ }
+ catch (NumberFormatException e)
+ {
+ //Ok
+ }
+ try
+ {
+ log.info(message.getFloatProperty(propName1));
+ }
+ catch (NullPointerException e)
+ {
+ //Ok
+ }
+ try
+ {
+ log.info(message.getDoubleProperty(propName1));
+ }
+ catch (NullPointerException e)
+ {
+ //Ok
+ }
+ }
+
+ private Message sendAndConsumeMessage(final Message msg, final MessageProducer prod, final MessageConsumer cons) throws Exception
+ {
+ prod.send(msg);
+
+ Message received = cons.receive(TIMEOUT);
+
+ return received;
+ }
+
+ // Inner classes -------------------------------------------------
+}
16 years, 2 months
JBoss hornetq SVN: r8006 - in branches/Replication_Clebert: src/main/org/hornetq/core/persistence and 6 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-09-28 21:14:33 -0400 (Mon, 28 Sep 2009)
New Revision: 8006
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java
branches/Replication_Clebert/src/main/org/hornetq/core/journal/TestableJournal.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Changes
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java 2009-09-28 22:26:08 UTC (rev 8005)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java 2009-09-29 01:14:33 UTC (rev 8006)
@@ -74,7 +74,13 @@
void appendRollbackRecord(long txID, boolean sync) throws Exception;
// Load
+
+ /** This method could be promoted to {@link Journal} interface when we decide to use the loadManager
+ * instead of load(List,List)
+ */
+ long load(LoaderCallback reloadManager) throws Exception;
+
long load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions, TransactionFailureCallback transactionFailure) throws Exception;
int getAlignment() throws Exception;
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/journal/TestableJournal.java 2009-09-28 22:26:08 UTC (rev 8005)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/journal/TestableJournal.java 2009-09-29 01:14:33 UTC (rev 8006)
@@ -46,11 +46,6 @@
int getMaxAIO();
- /** This method could be promoted to {@link Journal} interface when we decide to use the loadManager
- * instead of load(List,List)
- */
- long load(LoaderCallback reloadManager) throws Exception;
-
void forceMoveNextFile() throws Exception;
void setAutoReclaim(boolean autoReclaim);
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java 2009-09-28 22:26:08 UTC (rev 8005)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java 2009-09-29 01:14:33 UTC (rev 8006)
@@ -96,6 +96,10 @@
void deletePageTransactional(long txID, long recordID) throws Exception;
+ /** This method is only useful at the backup side. We only load internal structures making the journals ready for
+ * append mode on the backup side. */
+ void loadInternalOnly() throws Exception;
+
void loadMessageJournal(PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, Queue> queues,
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-09-28 22:26:08 UTC (rev 8005)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-09-29 01:14:33 UTC (rev 8006)
@@ -35,6 +35,7 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.LoaderCallback;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFile;
@@ -1087,7 +1088,42 @@
{
return started;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#loadInternalOnly()
+ */
+ public void loadInternalOnly() throws Exception
+ {
+ LoaderCallback dummyLoader = new LoaderCallback()
+ {
+
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ {
+ }
+
+ public void updateRecord(RecordInfo info)
+ {
+ }
+
+ public void deleteRecord(long id)
+ {
+ }
+
+ public void addRecord(RecordInfo info)
+ {
+ }
+
+ public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
+ {
+ }
+ };
+
+ bindingsJournal.load(dummyLoader);
+ messageJournal.load(dummyLoader);
+ }
+
+
// Public -----------------------------------------------------------------------------------
public Journal getMessageJournal()
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-09-28 22:26:08 UTC (rev 8005)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-09-29 01:14:33 UTC (rev 8006)
@@ -238,4 +238,11 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#loadInternalOnly()
+ */
+ public void loadInternalOnly() throws Exception
+ {
+ }
+
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java 2009-09-28 22:26:08 UTC (rev 8005)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java 2009-09-29 01:14:33 UTC (rev 8006)
@@ -14,6 +14,7 @@
package org.hornetq.core.replication;
import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.server.HornetQComponent;
/**
* A ReplicationEndpoint
@@ -22,7 +23,7 @@
*
*
*/
-public interface ReplicationEndpoint extends ChannelHandler
+public interface ReplicationEndpoint extends ChannelHandler, HornetQComponent
{
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-09-28 22:26:08 UTC (rev 8005)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-09-29 01:14:33 UTC (rev 8006)
@@ -33,7 +33,7 @@
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
-
+
private final HornetQServer server;
// Static --------------------------------------------------------
@@ -41,7 +41,7 @@
// Constructors --------------------------------------------------
public ReplicationEndpointImpl(HornetQServer server)
{
- this.server = server ;
+ this.server = server;
}
// Public --------------------------------------------------------
@@ -53,6 +53,28 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#isStarted()
+ */
+ public boolean isStarted()
+ {
+ return true;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#start()
+ */
+ public void start() throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#stop()
+ */
+ public void stop() throws Exception
+ {
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-28 22:26:08 UTC (rev 8005)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-29 01:14:33 UTC (rev 8006)
@@ -199,6 +199,8 @@
private ConnectionManager replicatingConnectionManager;
private ReplicationManager replicationManager;
+
+ private ReplicationEndpoint replicationEndpoint = new ReplicationEndpointImpl(this);
private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
@@ -593,7 +595,12 @@
public synchronized ReplicationEndpoint createReplicationEndpoint()
{
- return new ReplicationEndpointImpl(this);
+ if (replicationEndpoint == null)
+ {
+ replicationEndpoint = new ReplicationEndpointImpl(this);
+
+ }
+ return replicationEndpoint;
}
public void removeSession(final String name) throws Exception
@@ -891,21 +898,24 @@
{
// Handle backup server activation
- if (configuration.isSharedStore())
+ if (!configuration.isSharedStore())
{
- // Complete the startup procedure
+ if (replicationEndpoint == null)
+ {
+ log.warn("There is no replication endpoint, can't activate this backup server");
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Can't activate the server");
+ }
+
+ replicationEndpoint.stop();
+ }
+
+ // Complete the startup procedure
- log.info("Activating server");
+ log.info("Activating server");
- configuration.setBackup(false);
+ configuration.setBackup(false);
- initialisePart2();
- }
- else
- {
- // TODO
- // just load journal
- }
+ initialisePart2();
}
return true;
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-09-28 22:26:08 UTC (rev 8005)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-09-29 01:14:33 UTC (rev 8006)
@@ -1095,6 +1095,13 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#loadInternalOnly()
+ */
+ public void loadInternalOnly() throws Exception
+ {
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
16 years, 2 months
JBoss hornetq SVN: r8005 - in branches/Replication_Clebert: src/main/org/hornetq/core/remoting/impl/wireformat and 7 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-09-28 18:26:08 -0400 (Mon, 28 Sep 2009)
New Revision: 8005
Added:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/CreateReplicationSessionMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPacket.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
Removed:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/HandlerFactory.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/BackupListener.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationPacketHandler.java
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Backup
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-09-28 20:30:13 UTC (rev 8004)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-09-28 22:26:08 UTC (rev 8005)
@@ -13,6 +13,8 @@
package org.hornetq.core.remoting.impl;
+
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
@@ -62,6 +64,7 @@
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
+import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
@@ -349,7 +352,13 @@
{
packet = new SessionSendContinuationMessage();
break;
- }
+ }
+
+ case CREATE_REPLICATION:
+ {
+ packet = new CreateReplicationSessionMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/CreateReplicationSessionMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/CreateReplicationSessionMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/CreateReplicationSessionMessage.java 2009-09-28 22:26:08 UTC (rev 8005)
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Clebert Suconic</a>
+ */
+public class CreateReplicationSessionMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long sessionChannelID;
+
+ private int windowSize;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public CreateReplicationSessionMessage(final long sessionChannelID, final int windowSize)
+ {
+ super(CREATE_REPLICATION);
+
+ this.sessionChannelID = sessionChannelID;
+
+ this.windowSize = windowSize;
+ }
+
+ public CreateReplicationSessionMessage()
+ {
+ super(CREATE_REPLICATION);
+ }
+
+ // Public --------------------------------------------------------
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE +
+ // buffer.writeLong(sessionChannelID);
+ DataConstants.SIZE_LONG +
+ // buffer.writeInt(windowSize);
+ DataConstants.SIZE_INT;
+
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(sessionChannelID);
+ buffer.writeInt(windowSize);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ sessionChannelID = buffer.readLong();
+ windowSize = buffer.readInt();
+ }
+
+ /**
+ * @return the sessionChannelID
+ */
+ public long getSessionChannelID()
+ {
+ return sessionChannelID;
+ }
+
+ /**
+ * @return the windowSize
+ */
+ public int getWindowSize()
+ {
+ return windowSize;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-09-28 20:30:13 UTC (rev 8004)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-09-28 22:26:08 UTC (rev 8005)
@@ -64,6 +64,8 @@
public static final byte CREATE_QUEUE = 34;
public static final byte DELETE_QUEUE = 35;
+
+ public static final byte CREATE_REPLICATION = 36;
// Session
@@ -136,6 +138,10 @@
public static final byte SESS_RECEIVE_MSG = 75;
public static final byte SESS_RECEIVE_CONTINUATION = 76;
+
+ // Replication
+
+ public static final byte REPLICATION_SEND_REPLICATION = 77;
// Static --------------------------------------------------------
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPacket.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPacket.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPacket.java 2009-09-28 22:26:08 UTC (rev 8005)
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+/**
+ * A ReplicationPacket
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationPacket extends PacketImpl
+{
+
+ /**
+ * @param type
+ */
+ public ReplicationPacket(byte type)
+ {
+ super(type);
+ // TODO Auto-generated constructor stub
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Deleted: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/HandlerFactory.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/HandlerFactory.java 2009-09-28 20:30:13 UTC (rev 8004)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/HandlerFactory.java 2009-09-28 22:26:08 UTC (rev 8005)
@@ -1,32 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.remoting.server;
-
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.ChannelHandler;
-import org.hornetq.core.remoting.RemotingConnection;
-
-/**
- * The RemotingService could be used by either the Replication Manager or by the Server.
- *
- * Each will need to use a different Handler, so this factory may be used to pass what handler needs to be created
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public interface HandlerFactory
-{
- ChannelHandler getHandler(RemotingConnection conn, Channel channel);
-}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-09-28 20:30:13 UTC (rev 8004)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-09-28 22:26:08 UTC (rev 8005)
@@ -42,7 +42,6 @@
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.Ping;
-import org.hornetq.core.remoting.server.HandlerFactory;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.spi.Acceptor;
import org.hornetq.core.remoting.spi.AcceptorFactory;
@@ -86,7 +85,7 @@
private final ExecutorFactory executorFactory;
- private final HandlerFactory handlerFactory;
+ private final HornetQServer server;
private ManagementService managementService;
@@ -105,7 +104,7 @@
// Constructors --------------------------------------------------
public RemotingServiceImpl(final Configuration config,
- final HandlerFactory handlerFactory,
+ final HornetQServer server,
final ExecutorFactory executorFactory,
final ManagementService managementService,
final Executor threadPool,
@@ -116,7 +115,7 @@
this.executorFactory = executorFactory;
- this.handlerFactory = handlerFactory;
+ this.server = server;
ClassLoader loader = Thread.currentThread().getContextClassLoader();
for (String interceptorClass : config.getInterceptorClassNames())
@@ -306,8 +305,19 @@
public void connectionCreated(final Connection connection)
{
- RemotingConnection rc = createChannel(connection);
+ RemotingConnection rc = new RemotingConnectionImpl(connection,
+ interceptors,
+ executorFactory != null ? executorFactory.getExecutor() : null);
+ Channel channel1 = rc.getChannel(1, -1, false);
+
+ ChannelHandler handler = createHandler(rc, channel1);
+
+ channel1.setHandler(handler);
+
+
+
+
long ttl = ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
if (config.getConnectionTTLOverride() != -1)
{
@@ -400,19 +410,9 @@
/**
* Subclasses (on tests) may use this to create a different channel.
*/
- protected RemotingConnection createChannel(final Connection connection)
+ protected ChannelHandler createHandler(final RemotingConnection rc, Channel channel)
{
- RemotingConnection rc = new RemotingConnectionImpl(connection,
- interceptors,
- executorFactory != null ? executorFactory.getExecutor() : null);
-
- Channel channel1 = rc.getChannel(1, -1, false);
-
- ChannelHandler handler = handlerFactory.getHandler(rc, channel1);
-
- channel1.setHandler(handler);
-
- return rc;
+ return new HornetQPacketHandler(server, channel, rc);
}
// Private -------------------------------------------------------
Deleted: branches/Replication_Clebert/src/main/org/hornetq/core/replication/BackupListener.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/BackupListener.java 2009-09-28 20:30:13 UTC (rev 8004)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/BackupListener.java 2009-09-28 22:26:08 UTC (rev 8005)
@@ -1,46 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.replication;
-
-/**
- * A ReplicationListener
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public interface BackupListener
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- void onReplication(byte data[]);
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java 2009-09-28 22:26:08 UTC (rev 8005)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication;
+
+import org.hornetq.core.remoting.ChannelHandler;
+
+/**
+ * A ReplicationEndpoint
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface ReplicationEndpoint extends ChannelHandler
+{
+
+}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-09-28 20:30:13 UTC (rev 8004)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-09-28 22:26:08 UTC (rev 8005)
@@ -13,17 +13,15 @@
package org.hornetq.core.replication;
+import org.hornetq.core.server.HornetQComponent;
+
/**
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*
*
*/
-public interface ReplicationManager
+public interface ReplicationManager extends HornetQComponent
{
void replicate(byte[] bytes, ReplicationToken token);
-
-
- /** to be used on the backup node only */
- void addBackupListener(BackupListener listener);
}
Copied: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java (from rev 7996, branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationPacketHandler.java)
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-09-28 22:26:08 UTC (rev 8005)
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication.impl;
+
+import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPacket;
+import org.hornetq.core.replication.ReplicationEndpoint;
+import org.hornetq.core.replication.ReplicationManager;
+import org.hornetq.core.server.HornetQServer;
+
+/**
+ * A ReplicationPacketHandler
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationEndpointImpl implements ReplicationEndpoint
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final HornetQServer server;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+ public ReplicationEndpointImpl(HornetQServer server)
+ {
+ this.server = server ;
+ }
+
+ // Public --------------------------------------------------------
+ /* (non-Javadoc)
+ * @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
+ */
+ public void handlePacket(Packet packet)
+ {
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-09-28 20:30:13 UTC (rev 8004)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-09-28 22:26:08 UTC (rev 8005)
@@ -13,7 +13,10 @@
package org.hornetq.core.replication.impl;
-import org.hornetq.core.replication.BackupListener;
+import org.hornetq.core.client.impl.ConnectionManager;
+import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.replication.ReplicationToken;
@@ -31,30 +34,86 @@
// Attributes ----------------------------------------------------
+ // TODO: Should this be configurable or not?
+ private static final int WINDOW_SIZE = 100 * 1024;
+
+ private final ConnectionManager connectionManager;
+
+ private RemotingConnection connection;
+
+ private Channel replicatingChannel;
+
+ private boolean started;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
+ /**
+ * @param replicationConnectionManager
+ */
+ public ReplicationManagerImpl(ConnectionManager connectionManager)
+ {
+ super();
+ this.connectionManager = connectionManager;
+ }
+
// Public --------------------------------------------------------
/* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#addListener(org.hornetq.core.replication.ReplicationListener)
+ * @see org.hornetq.core.replication.ReplicationManager#replicate(byte[], org.hornetq.core.replication.ReplicationToken)
*/
- public void addBackupListener(BackupListener listener)
+ public void replicate(byte[] bytes, ReplicationToken token)
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#replicate(byte[], org.hornetq.core.replication.ReplicationToken)
+ * @see org.hornetq.core.server.HornetQComponent#isStarted()
*/
- public void replicate(byte[] bytes, ReplicationToken token)
+ public synchronized boolean isStarted()
{
- // TODO Auto-generated method stub
-
+ return this.started;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#start()
+ */
+ public synchronized void start() throws Exception
+ {
+ this.started = true;
+
+ connection = connectionManager.getConnection(1);
+
+ long channelID = connection.generateChannelID();
+
+ Channel mainChannel = connection.getChannel(1, -1, false);
+
+ Channel tempChannel = connection.getChannel(channelID, WINDOW_SIZE, false);
+
+ CreateReplicationSessionMessage replicationStartPackage = new CreateReplicationSessionMessage(channelID,
+ WINDOW_SIZE);
+
+ mainChannel.sendBlocking(replicationStartPackage);
+
+ this.replicatingChannel = tempChannel;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#stop()
+ */
+ public void stop() throws Exception
+ {
+ replicatingChannel.close();
+
+ this.started = false;
+
+ connection.destroy();
+
+ connection = null;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Deleted: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationPacketHandler.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationPacketHandler.java 2009-09-28 20:30:13 UTC (rev 8004)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationPacketHandler.java 2009-09-28 22:26:08 UTC (rev 8005)
@@ -1,54 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.replication.impl;
-
-import org.hornetq.core.remoting.ChannelHandler;
-import org.hornetq.core.remoting.Packet;
-
-/**
- * A ReplicationPacketHandler
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class ReplicationPacketHandler implements ChannelHandler
-{
-
- /* (non-Javadoc)
- * @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
- */
- public void handlePacket(Packet packet)
- {
- }
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java 2009-09-28 20:30:13 UTC (rev 8004)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java 2009-09-28 22:26:08 UTC (rev 8005)
@@ -27,6 +27,7 @@
import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
import org.hornetq.core.remoting.server.RemotingService;
+import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.security.HornetQSecurityManager;
import org.hornetq.core.security.Role;
import org.hornetq.core.server.cluster.ClusterManager;
@@ -36,7 +37,6 @@
import org.hornetq.core.version.Version;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.SimpleString;
-import org.hornetq.utils.UUID;
/**
* This interface defines the internal interface of the HornetQ Server exposed to other components of the server. The
@@ -69,6 +69,8 @@
void unregisterActivateCallback(ActivateCallback callback);
ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
+
+ ReplicationEndpoint createReplicationEndpoint();
CreateSessionResponseMessage createSession(String name,
long channelID,
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-09-28 20:30:13 UTC (rev 8004)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-09-28 22:26:08 UTC (rev 8005)
@@ -16,6 +16,7 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
@@ -24,9 +25,12 @@
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
+import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
+import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
/**
@@ -91,6 +95,16 @@
break;
}
+ case CREATE_REPLICATION:
+ {
+ // Create queue can also be fielded here in the case of a replicated store and forward queue creation
+
+ CreateReplicationSessionMessage request = (CreateReplicationSessionMessage)packet;
+
+ handleCreateReplication(request);
+
+ break;
+ }
default:
{
log.error("Invalid packet " + packet);
@@ -174,6 +188,38 @@
log.error("Failed to handle create queue", e);
}
}
+
+ private void handleCreateReplication(CreateReplicationSessionMessage request)
+ {
+ Packet response;
+ try
+ {
+ Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize(), false);
+ ReplicationEndpoint endpoint = server.createReplicationEndpoint();
+ channel.setHandler(endpoint);
+ response = new NullResponseMessage();
+
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+
+ if (e instanceof HornetQException)
+ {
+ response = new HornetQExceptionMessage((HornetQException)e);
+ }
+ else
+ {
+ response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
+ }
+ }
+
+ channel1.send(response);
+ }
+
+
+
+
}
\ No newline at end of file
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-28 20:30:13 UTC (rev 8004)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-28 22:26:08 UTC (rev 8005)
@@ -32,8 +32,11 @@
import javax.management.MBeanServer;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ConnectionManager;
+import org.hornetq.core.client.impl.ConnectionManagerImpl;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.cluster.DivertConfiguration;
import org.hornetq.core.config.cluster.QueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -65,13 +68,15 @@
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
-import org.hornetq.core.remoting.server.HandlerFactory;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
+import org.hornetq.core.replication.ReplicationEndpoint;
+import org.hornetq.core.replication.ReplicationManager;
+import org.hornetq.core.replication.impl.ReplicationEndpointImpl;
+import org.hornetq.core.replication.impl.ReplicationManagerImpl;
import org.hornetq.core.security.CheckType;
import org.hornetq.core.security.HornetQSecurityManager;
import org.hornetq.core.security.Role;
@@ -192,6 +197,8 @@
private static AtomicInteger managementConnectorSequence = new AtomicInteger(0);
private ConnectionManager replicatingConnectionManager;
+
+ private ReplicationManager replicationManager;
private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
@@ -583,6 +590,11 @@
return new CreateSessionResponseMessage(true, version.getIncrementingVersion());
}
+
+ public synchronized ReplicationEndpoint createReplicationEndpoint()
+ {
+ return new ReplicationEndpointImpl(this);
+ }
public void removeSession(final String name) throws Exception
{
@@ -658,83 +670,45 @@
// }
// }
- // private boolean setupReplicatingConnection() throws Exception
- // {
- // String backupConnectorName = configuration.getBackupConnectorName();
- //
- // if (backupConnectorName != null)
- // {
- // TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
- //
- // if (backupConnector == null)
- // {
- // log.warn("connector with name '" + backupConnectorName + "' is not defined in the configuration.");
- // }
- // else
- // {
- // replicatingConnectionManager = new ConnectionManagerImpl(null,
- // backupConnector,
- // null,
- // false,
- // 1,
- // ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
- // ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- // ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- // 0,
- // 1.0d,
- // 0,
- // threadPool,
- // scheduledPool);
- //
- // replicatingConnection = replicatingConnectionManager.getConnection(1);
- //
- // if (replicatingConnection != null)
- // {
- // replicatingChannel = replicatingConnection.getChannel(2, -1, false);
- //
- // replicatingConnection.addFailureListener(new FailureListener()
- // {
- // public void connectionFailed(HornetQException me)
- // {
- // replicatingChannel.executeOutstandingDelayedResults();
- // }
- // });
- //
- // // First time we get channel we send a message down it informing the backup of our node id -
- // // backup and live must have the same node id
- //
- // Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
- //
- // final Future future = new Future();
- //
- // replicatingChannel.replicatePacket(packet, 1, new Runnable()
- // {
- // public void run()
- // {
- // future.run();
- // }
- // });
- //
- // // This may take a while especially if the journal is large
- // boolean ok = future.await(60000);
- //
- // if (!ok)
- // {
- // throw new IllegalStateException("Timed out waiting for response from backup for initialisation");
- // }
- // }
- // else
- // {
- // log.warn("Backup server MUST be started before live server. Initialisation will not proceed.");
- //
- // return false;
- // }
- // }
- // }
- //
- // return true;
- // }
+ private boolean startReplication() throws Exception
+ {
+ String backupConnectorName = configuration.getBackupConnectorName();
+ if (backupConnectorName != null)
+ {
+ TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
+
+ if (backupConnector == null)
+ {
+ log.warn("connector with name '" + backupConnectorName + "' is not defined in the configuration.");
+ }
+ else
+ {
+
+ replicatingConnectionManager = new ConnectionManagerImpl(null,
+ backupConnector,
+ null,
+ false,
+ 1,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
+ 0,
+ 1.0d,
+ 0,
+ false,
+ threadPool,
+ scheduledPool,
+ null);
+
+ this.replicationManager = new ReplicationManagerImpl(replicatingConnectionManager);
+ replicationManager.start();
+ }
+ }
+
+ return true;
+ }
+
public HornetQServerControlImpl getHornetQServerControl()
{
return messagingServerControl;
@@ -883,7 +857,7 @@
{
return threadPool;
}
-
+
/**
* This method is protected as it may be used as a hook for creating a custom storage manager (on tests for instance)
* @return
@@ -900,8 +874,6 @@
}
}
-
-
// Private
// --------------------------------------------------------------------------------------
@@ -961,25 +933,17 @@
managementService = new ManagementServiceImpl(mbeanServer, configuration, managementConnectorID);
- final HandlerFactory handlerFactory = new HandlerFactory()
- {
-
- public ChannelHandler getHandler(RemotingConnection conn, Channel channel)
- {
- return new HornetQPacketHandler(HornetQServerImpl.this, channel, conn);
- }
-
- };
-
remotingService = new RemotingServiceImpl(configuration,
- handlerFactory,
- (configuration.isAsyncConnectionExecutionEnabled() ? this.executorFactory : null),
+ this,
+ (configuration.isAsyncConnectionExecutionEnabled() ? this.executorFactory
+ : null),
managementService,
threadPool,
scheduledPool,
managementConnectorID);
- memoryManager = new MemoryManagerImpl(configuration.getMemoryWarningThreshold(), configuration.getMemoryMeasureInterval());
+ memoryManager = new MemoryManagerImpl(configuration.getMemoryWarningThreshold(),
+ configuration.getMemoryMeasureInterval());
memoryManager.start();
}
@@ -1182,7 +1146,7 @@
queues.put(queueBindingInfo.getPersistenceID(), queue);
postOffice.addBinding(binding);
-
+
managementService.registerAddress(queueBindingInfo.getAddress());
managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
}
@@ -1278,7 +1242,7 @@
}
postOffice.addBinding(binding);
-
+
managementService.registerAddress(address);
managementService.registerQueue(queue, address, storageManager);
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-09-28 20:30:13 UTC (rev 8004)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-09-28 22:26:08 UTC (rev 8005)
@@ -42,15 +42,15 @@
import org.hornetq.core.remoting.Interceptor;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
-import org.hornetq.core.remoting.server.HandlerFactory;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
-import org.hornetq.core.remoting.spi.Connection;
+import org.hornetq.core.replication.ReplicationEndpoint;
+import org.hornetq.core.replication.impl.ReplicationEndpointImpl;
+import org.hornetq.core.replication.impl.ReplicationManagerImpl;
import org.hornetq.core.security.HornetQSecurityManager;
import org.hornetq.core.security.Role;
import org.hornetq.core.server.ActivateCallback;
@@ -59,7 +59,6 @@
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.cluster.ClusterManager;
-import org.hornetq.core.server.impl.HornetQPacketHandler;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.ResourceManager;
@@ -89,7 +88,7 @@
private ExecutorService executor;
- private ConnectionManager connectionManagerLive;
+ private ConnectionManager connectionManager;
private ScheduledExecutorService scheduledExecutor;
@@ -101,52 +100,39 @@
public void testBasicConnection() throws Exception
{
-
- RemotingConnection conn = connectionManagerLive.getConnection(1);
-
- Channel chann = conn.getChannel(2, -1, false);
-
- chann.close();
-
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+ manager.start();
+ manager.stop();
}
// Package protected ---------------------------------------------
-
- class LocalHandler implements ChannelHandler
+ class LocalRemotingServiceImpl extends RemotingServiceImpl
{
- final Channel channel;
-
- /**
- * @param channel
- */
- public LocalHandler(Channel channel)
+ public LocalRemotingServiceImpl(Configuration config,
+ HornetQServer server,
+ ExecutorFactory executorFactory,
+ ManagementService managementService,
+ Executor threadPool,
+ ScheduledExecutorService scheduledThreadPool,
+ int managementConnectorID)
{
- super();
- this.channel = channel;
+ super(config,
+ server,
+ executorFactory,
+ managementService,
+ threadPool,
+ scheduledThreadPool,
+ managementConnectorID);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
- */
- public void handlePacket(Packet packet)
+ protected ChannelHandler createHandler(RemotingConnection conn, Channel channel)
{
- channel.send(new NullResponseMessage());
+ return super.createHandler(conn, channel);
}
}
- HandlerFactory handlerFactory = new HandlerFactory()
- {
-
- public ChannelHandler getHandler(RemotingConnection conn, Channel channel)
- {
- System.out.println("Created a handler");
- return new LocalHandler(channel);
- }
-
- };
-
// Protected -----------------------------------------------------
protected void setUp() throws Exception
@@ -159,7 +145,7 @@
scheduledExecutor = new ScheduledThreadPoolExecutor(10, tFactory);
- remoting = new RemotingServiceImpl(config, handlerFactory, null, null, executor, scheduledExecutor, 0);
+ remoting = new LocalRemotingServiceImpl(config, new FakeServer(), null, null, executor, scheduledExecutor, 0);
remoting.start();
@@ -169,21 +155,21 @@
List<Interceptor> interceptors = new ArrayList<Interceptor>();
- connectionManagerLive = new ConnectionManagerImpl(null,
- connectorConfig,
- null,
- false,
- 1,
- ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
- ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
- 0,
- 1.0d,
- 0,
- false,
- executor,
- scheduledExecutor,
- interceptors);
+ connectionManager = new ConnectionManagerImpl(null,
+ connectorConfig,
+ null,
+ false,
+ 1,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
+ 0,
+ 1.0d,
+ 0,
+ false,
+ executor,
+ scheduledExecutor,
+ interceptors);
}
@@ -206,4 +192,312 @@
// Inner classes -------------------------------------------------
+ static class FakeServer implements HornetQServer
+ {
+
+ public Queue createQueue(SimpleString address,
+ SimpleString queueName,
+ SimpleString filter,
+ boolean durable,
+ boolean temporary) throws Exception
+ {
+ return null;
+ }
+
+ public CreateSessionResponseMessage createSession(String name,
+ long channelID,
+ String username,
+ String password,
+ int minLargeMessageSize,
+ int incrementingVersion,
+ RemotingConnection remotingConnection,
+ boolean autoCommitSends,
+ boolean autoCommitAcks,
+ boolean preAcknowledge,
+ boolean xa,
+ int producerWindowSize) throws Exception
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#deployQueue(org.hornetq.utils.SimpleString, org.hornetq.utils.SimpleString, org.hornetq.utils.SimpleString, boolean, boolean)
+ */
+ public Queue deployQueue(SimpleString address,
+ SimpleString queueName,
+ SimpleString filterString,
+ boolean durable,
+ boolean temporary) throws Exception
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#destroyQueue(org.hornetq.utils.SimpleString, org.hornetq.core.server.ServerSession)
+ */
+ public void destroyQueue(SimpleString queueName, ServerSession session) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getAddressSettingsRepository()
+ */
+ public HierarchicalRepository<AddressSettings> getAddressSettingsRepository()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getClusterManager()
+ */
+ public ClusterManager getClusterManager()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getConfiguration()
+ */
+ public Configuration getConfiguration()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getConnectionCount()
+ */
+ public int getConnectionCount()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getExecutorFactory()
+ */
+ public ExecutorFactory getExecutorFactory()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getHornetQServerControl()
+ */
+ public HornetQServerControlImpl getHornetQServerControl()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getMBeanServer()
+ */
+ public MBeanServer getMBeanServer()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getManagementService()
+ */
+ public ManagementService getManagementService()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getNodeID()
+ */
+ public SimpleString getNodeID()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getPostOffice()
+ */
+ public PostOffice getPostOffice()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getQueueFactory()
+ */
+ public QueueFactory getQueueFactory()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getRemotingService()
+ */
+ public RemotingService getRemotingService()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getResourceManager()
+ */
+ public ResourceManager getResourceManager()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getSecurityManager()
+ */
+ public HornetQSecurityManager getSecurityManager()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getSecurityRepository()
+ */
+ public HierarchicalRepository<Set<Role>> getSecurityRepository()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getSession(java.lang.String)
+ */
+ public ServerSession getSession(String name)
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getSessions()
+ */
+ public Set<ServerSession> getSessions()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getSessions(java.lang.String)
+ */
+ public List<ServerSession> getSessions(String connectionID)
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getStorageManager()
+ */
+ public StorageManager getStorageManager()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#getVersion()
+ */
+ public Version getVersion()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#isInitialised()
+ */
+ public boolean isInitialised()
+ {
+
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#isStarted()
+ */
+ public boolean isStarted()
+ {
+
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#reattachSession(org.hornetq.core.remoting.RemotingConnection, java.lang.String, int)
+ */
+ public ReattachSessionResponseMessage reattachSession(RemotingConnection connection,
+ String name,
+ int lastReceivedCommandID) throws Exception
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#registerActivateCallback(org.hornetq.core.server.ActivateCallback)
+ */
+ public void registerActivateCallback(ActivateCallback callback)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#removeSession(java.lang.String)
+ */
+ public void removeSession(String name) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#unregisterActivateCallback(org.hornetq.core.server.ActivateCallback)
+ */
+ public void unregisterActivateCallback(ActivateCallback callback)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#start()
+ */
+ public void start() throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#stop()
+ */
+ public void stop() throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQServer#createReplicationEndpoint()
+ */
+ public ReplicationEndpoint createReplicationEndpoint()
+ {
+ return new ReplicationEndpointImpl(this);
+ }
+
+ }
}
16 years, 2 months
JBoss hornetq SVN: r8004 - trunk/tests/jms-tests/src/org/hornetq/jms/tests.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-09-28 16:30:13 -0400 (Mon, 28 Sep 2009)
New Revision: 8004
Modified:
trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java
Log:
Removing non-used test
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java 2009-09-28 18:08:09 UTC (rev 8003)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/ReferenceableTest.java 2009-09-28 20:30:13 UTC (rev 8004)
@@ -135,35 +135,6 @@
simpleSendReceive(cf, topic2);
}
-
-// public void testReferenceRAManagedCF() throws Exception
-// {
-// HornetQResourceAdapter ra = new HornetQResourceAdapter();
-//
-// HornetQRAManagedConnectionFactory mcf = new HornetQRAManagedConnectionFactory();
-//
-// mcf.setResourceAdapter(ra);
-//
-// ConnectionFactory racf = (ConnectionFactory)mcf.createConnectionFactory();
-//
-// Reference cfRef = ((Referenceable)racf).getReference();
-//
-// String factoryName = cfRef.getFactoryClassName();
-//
-// Class factoryClass = Class.forName(factoryName);
-//
-// ConnectionFactoryObjectFactory factory = (ConnectionFactoryObjectFactory)factoryClass.newInstance();
-//
-// Object instance = factory.getObjectInstance(cfRef, null, null, null);
-//
-// assertTrue(instance instanceof HornetQRAConnectionFactory);
-//
-// HornetQRAConnectionFactory racf2 = (HornetQRAConnectionFactory)instance;
-//
-// //TODO implement simpleSendReceive with test inside the container or wait until https://jira.jboss.org/jira/browse/HORNETQ-140 is done.
-// }
-//
-
protected void simpleSendReceive(ConnectionFactory cf, Destination dest) throws Exception
{
Connection conn = null;
16 years, 2 months
JBoss hornetq SVN: r8003 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-09-28 14:08:09 -0400 (Mon, 28 Sep 2009)
New Revision: 8003
Modified:
trunk/docs/user-manual/en/configuration-index.xml
trunk/docs/user-manual/en/connection-ttl.xml
Log:
changed default connection ttl in docs to 1 minute
Modified: trunk/docs/user-manual/en/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/en/configuration-index.xml 2009-09-28 17:24:07 UTC (rev 8002)
+++ trunk/docs/user-manual/en/configuration-index.xml 2009-09-28 18:08:09 UTC (rev 8003)
@@ -871,7 +871,7 @@
>connection-factory.connection-ttl</link></entry>
<entry>Long</entry>
<entry>the time to live (in ms) for connections</entry>
- <entry>5 * 60000</entry>
+ <entry>1 * 60000</entry>
</row>
<row>
<entry><link linkend="flow-control.rate.core.api"
Modified: trunk/docs/user-manual/en/connection-ttl.xml
===================================================================
--- trunk/docs/user-manual/en/connection-ttl.xml 2009-09-28 17:24:07 UTC (rev 8002)
+++ trunk/docs/user-manual/en/connection-ttl.xml 2009-09-28 18:08:09 UTC (rev 8003)
@@ -98,7 +98,7 @@
instance, or if you're deploying JMS connection factory instances direct into JNDI on
the server side, you can specify it in the xml config, using the parameter <literal
>connection-ttl</literal>.</para>
- <para>The default value for connection ttl is <literal>300000</literal>ms, i.e. 5 minutes. A
+ <para>The default value for connection ttl is <literal>60000</literal>ms, i.e. 1 minute. A
value of <literal>-1</literal> for <literal>ConnectionTTL</literal> means the server
will never time out the connection on the server side.</para>
<para>If you do not wish clients to be able to specify their own connection TTL, you can
@@ -151,7 +151,7 @@
deploying JMS connection factory instances direct into JNDI on the server side, you can
specify it in the <literal>hornetq-jms.xml </literal> configuration file, using the
parameter <literal>client-failure-check-period</literal>.</para>
- <para>The default value for client failure check period is <literal>5000</literal>ms, i.e. 5
+ <para>The default value for client failure check period is <literal>30000</literal>ms, i.e. 30
seconds. A value of <literal>-1</literal> means the client will never fail the
connection on the client side if no data is received from the server. Typically this is
much lower than connection TTL to allow clients to reconnect in case of transitory
16 years, 2 months
JBoss hornetq SVN: r8002 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-09-28 13:24:07 -0400 (Mon, 28 Sep 2009)
New Revision: 8002
Modified:
trunk/docs/user-manual/en/configuring-transports.xml
Log:
added note about binding acceptors to hosts
Modified: trunk/docs/user-manual/en/configuring-transports.xml
===================================================================
--- trunk/docs/user-manual/en/configuring-transports.xml 2009-09-28 16:24:46 UTC (rev 8001)
+++ trunk/docs/user-manual/en/configuring-transports.xml 2009-09-28 17:24:07 UTC (rev 8002)
@@ -203,10 +203,15 @@
<para><literal>hornetq.remoting.netty.host</literal>. This specified the host
name or IP address to connect to (when configuring a connector) or to listen
on (when configuring an acceptor). The default value for this property is
- <literal>localhost</literal>. When configuring acceptors, multiple hosts
- or IP addresses can be specified by separating them with commas.
- Note that if you want your servers
- accessible from other nodes, don't bind to localhost!</para>
+ <literal>localhost</literal>. When configuring acceptors, multiple hosts
+ or IP addresses can be specified by separating them with commas.</para>
+ <note>
+ <para>Don't forget to specify a host name or ip address! If you want your
+ server able to accept connections from other nodes you must specify a
+ hostname or ip address at which the acceptor will bind and listen for
+ incoming connections. The default is localhost which of course is not
+ accessible from remote nodes!</para>
+ </note>
</listitem>
<listitem>
<para><literal>hornetq.remoting.netty.port</literal>. This specified the port to
16 years, 2 months
JBoss hornetq SVN: r8001 - trunk/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-09-28 12:24:46 -0400 (Mon, 28 Sep 2009)
New Revision: 8001
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
Log:
reduced default ttl to 1 minute
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-09-28 16:19:13 UTC (rev 8000)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-09-28 16:24:46 UTC (rev 8001)
@@ -63,9 +63,8 @@
public static final long DEFAULT_CLIENT_FAILURE_CHECK_PERIOD = 30000;
- // 5 minutes - normally this should be much higher than ping period, this allows clients to re-attach on live
- // or backup without fear of session having already been closed when connection having timed out.
- public static final long DEFAULT_CONNECTION_TTL = 5 * 60 * 1000;
+ // 1 minute - this should be higher than ping period
+ public static final long DEFAULT_CONNECTION_TTL = 1 * 60 * 1000;
// Any message beyond this size is considered a large message (to be sent in chunks)
public static final int DEFAULT_MIN_LARGE_MESSAGE_SIZE = 100 * 1024;
16 years, 2 months
JBoss hornetq SVN: r8000 - in trunk/tests/src/org/hornetq/tests: stress/journal and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-09-28 12:19:13 -0400 (Mon, 28 Sep 2009)
New Revision: 8000
Added:
trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java
Removed:
trunk/tests/src/org/hornetq/tests/stress/journal/ValidateTransactionHealthTest.java
Log:
Moving a stress test to integration test.
(ValidateTransactionHealthTest is an important test. So i am moving it)
Copied: trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java (from rev 7999, trunk/tests/src/org/hornetq/tests/stress/journal/ValidateTransactionHealthTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java 2009-09-28 16:19:13 UTC (rev 8000)
@@ -0,0 +1,247 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.journal;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.journal.LoaderCallback;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.tests.stress.journal.remote.RemoteJournalAppender;
+import org.hornetq.tests.util.SpawnedVMSupport;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ *
+ * This test spawns a remote VM, as we want to "crash" the VM right after the journal is filled with data
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ *
+ */
+public class ValidateTransactionHealthTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testAIO() throws Exception
+ {
+ internalTest("aio", getTestDir(), 10000, 100, true, true, 1);
+ }
+
+ public void testAIOHugeTransaction() throws Exception
+ {
+ internalTest("aio", getTestDir(), 10000, 10000, true, true, 1);
+ }
+
+ public void testAIOMultiThread() throws Exception
+ {
+ internalTest("aio", getTestDir(), 1000, 100, true, true, 10);
+ }
+
+ public void testAIONonTransactionalNoExternalProcess() throws Exception
+ {
+ internalTest("aio", getTestDir(), 1000, 0, true, false, 10);
+ }
+
+ public void testNIO() throws Exception
+ {
+ internalTest("nio", getTestDir(), 10000, 100, true, true, 1);
+ }
+
+ public void testNIOHugeTransaction() throws Exception
+ {
+ internalTest("nio", getTestDir(), 10000, 10000, true, true, 1);
+ }
+
+ public void testNIOMultiThread() throws Exception
+ {
+ internalTest("nio", getTestDir(), 1000, 100, true, true, 10);
+ }
+
+ public void testNIONonTransactional() throws Exception
+ {
+ internalTest("nio", getTestDir(), 10000, 0, true, true, 1);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ File file = new File(getTestDir());
+ deleteDirectory(file);
+ file.mkdir();
+ }
+
+ // Private -------------------------------------------------------
+
+ private void internalTest(final String type,
+ final String journalDir,
+ final long numberOfRecords,
+ final int transactionSize,
+ final boolean append,
+ final boolean externalProcess,
+ final int numberOfThreads) throws Exception
+ {
+ try
+ {
+ if (type.equals("aio") && !AsynchronousFileImpl.isLoaded())
+ {
+ // Using System.out as this output will go towards junit report
+ System.out.println("AIO not found, test being ignored on this platform");
+ return;
+ }
+
+ // This property could be set to false for debug purposes.
+ if (append)
+ {
+ if (externalProcess)
+ {
+ Process process = SpawnedVMSupport.spawnVM(RemoteJournalAppender.class.getCanonicalName(),
+ type,
+ journalDir,
+ Long.toString(numberOfRecords),
+ Integer.toString(transactionSize),
+ Integer.toString(numberOfThreads));
+ process.waitFor();
+ assertEquals(RemoteJournalAppender.OK, process.exitValue());
+ }
+ else
+ {
+ JournalImpl journal = RemoteJournalAppender.appendData(type,
+ journalDir,
+ numberOfRecords,
+ transactionSize,
+ numberOfThreads);
+ journal.stop();
+ }
+ }
+
+ reload(type, journalDir, numberOfRecords, numberOfThreads);
+ }
+ finally
+ {
+ File file = new File(journalDir);
+ deleteDirectory(file);
+ }
+ }
+
+ private void reload(final String type, final String journalDir, final long numberOfRecords, final int numberOfThreads) throws Exception
+ {
+ JournalImpl journal = RemoteJournalAppender.createJournal(type, journalDir);
+
+ journal.start();
+ Loader loadTest = new Loader(numberOfRecords);
+ journal.load(loadTest);
+ assertEquals(numberOfRecords * numberOfThreads, loadTest.numberOfAdds);
+ assertEquals(0, loadTest.numberOfPreparedTransactions);
+ assertEquals(0, loadTest.numberOfUpdates);
+ assertEquals(0, loadTest.numberOfDeletes);
+
+ journal.stop();
+
+ if (loadTest.ex != null)
+ {
+ throw loadTest.ex;
+ }
+ }
+
+ // Inner classes -------------------------------------------------
+
+ class Loader implements LoaderCallback
+ {
+ int numberOfPreparedTransactions = 0;
+
+ int numberOfAdds = 0;
+
+ int numberOfDeletes = 0;
+
+ int numberOfUpdates = 0;
+
+ long expectedRecords = 0;
+
+ Exception ex = null;
+
+ long lastID = 0;
+
+ public Loader(final long expectedRecords)
+ {
+ this.expectedRecords = expectedRecords;
+ }
+
+ public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction)
+ {
+ numberOfPreparedTransactions++;
+
+ }
+
+ public void addRecord(final RecordInfo info)
+ {
+ if (info.id == lastID)
+ {
+ System.out.println("id = " + info.id + " last id = " + lastID);
+ }
+
+ ByteBuffer buffer = ByteBuffer.wrap(info.data);
+ long recordValue = buffer.getLong();
+
+ if (recordValue != info.id)
+ {
+ ex = new Exception("Content not as expected (" + recordValue + " != " + info.id + ")");
+
+ }
+
+ lastID = info.id;
+ numberOfAdds++;
+
+ }
+
+ public void deleteRecord(final long id)
+ {
+ numberOfDeletes++;
+
+ }
+
+ public void updateRecord(final RecordInfo info)
+ {
+ numberOfUpdates++;
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.TransactionFailureCallback#failedTransaction(long, java.util.List, java.util.List)
+ */
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ {
+ }
+
+ }
+
+}
Deleted: trunk/tests/src/org/hornetq/tests/stress/journal/ValidateTransactionHealthTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/ValidateTransactionHealthTest.java 2009-09-28 15:07:36 UTC (rev 7999)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/ValidateTransactionHealthTest.java 2009-09-28 16:19:13 UTC (rev 8000)
@@ -1,247 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.stress.journal;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.journal.LoaderCallback;
-import org.hornetq.core.journal.PreparedTransactionInfo;
-import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.tests.stress.journal.remote.RemoteJournalAppender;
-import org.hornetq.tests.util.SpawnedVMSupport;
-import org.hornetq.tests.util.UnitTestCase;
-
-/**
- *
- * This test spawns a remote VM, as we want to "crash" the VM right after the journal is filled with data
- *
- * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
- *
- */
-public class ValidateTransactionHealthTest extends UnitTestCase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testAIO() throws Exception
- {
- internalTest("aio", getTestDir(), 10000, 100, true, true, 1);
- }
-
- public void testAIOHugeTransaction() throws Exception
- {
- internalTest("aio", getTestDir(), 10000, 10000, true, true, 1);
- }
-
- public void testAIOMultiThread() throws Exception
- {
- internalTest("aio", getTestDir(), 1000, 100, true, true, 10);
- }
-
- public void testAIONonTransactionalNoExternalProcess() throws Exception
- {
- internalTest("aio", getTestDir(), 1000, 0, true, false, 10);
- }
-
- public void testNIO() throws Exception
- {
- internalTest("nio", getTestDir(), 10000, 100, true, true, 1);
- }
-
- public void testNIOHugeTransaction() throws Exception
- {
- internalTest("nio", getTestDir(), 10000, 10000, true, true, 1);
- }
-
- public void testNIOMultiThread() throws Exception
- {
- internalTest("nio", getTestDir(), 1000, 100, true, true, 10);
- }
-
- public void testNIONonTransactional() throws Exception
- {
- internalTest("nio", getTestDir(), 10000, 0, true, true, 1);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- File file = new File(getTestDir());
- deleteDirectory(file);
- file.mkdir();
- }
-
- // Private -------------------------------------------------------
-
- private void internalTest(final String type,
- final String journalDir,
- final long numberOfRecords,
- final int transactionSize,
- final boolean append,
- final boolean externalProcess,
- final int numberOfThreads) throws Exception
- {
- try
- {
- if (type.equals("aio") && !AsynchronousFileImpl.isLoaded())
- {
- // Using System.out as this output will go towards junit report
- System.out.println("AIO not found, test being ignored on this platform");
- return;
- }
-
- // This property could be set to false for debug purposes.
- if (append)
- {
- if (externalProcess)
- {
- Process process = SpawnedVMSupport.spawnVM(RemoteJournalAppender.class.getCanonicalName(),
- type,
- journalDir,
- Long.toString(numberOfRecords),
- Integer.toString(transactionSize),
- Integer.toString(numberOfThreads));
- process.waitFor();
- assertEquals(RemoteJournalAppender.OK, process.exitValue());
- }
- else
- {
- JournalImpl journal = RemoteJournalAppender.appendData(type,
- journalDir,
- numberOfRecords,
- transactionSize,
- numberOfThreads);
- journal.stop();
- }
- }
-
- reload(type, journalDir, numberOfRecords, numberOfThreads);
- }
- finally
- {
- File file = new File(journalDir);
- deleteDirectory(file);
- }
- }
-
- private void reload(final String type, final String journalDir, final long numberOfRecords, final int numberOfThreads) throws Exception
- {
- JournalImpl journal = RemoteJournalAppender.createJournal(type, journalDir);
-
- journal.start();
- Loader loadTest = new Loader(numberOfRecords);
- journal.load(loadTest);
- assertEquals(numberOfRecords * numberOfThreads, loadTest.numberOfAdds);
- assertEquals(0, loadTest.numberOfPreparedTransactions);
- assertEquals(0, loadTest.numberOfUpdates);
- assertEquals(0, loadTest.numberOfDeletes);
-
- journal.stop();
-
- if (loadTest.ex != null)
- {
- throw loadTest.ex;
- }
- }
-
- // Inner classes -------------------------------------------------
-
- class Loader implements LoaderCallback
- {
- int numberOfPreparedTransactions = 0;
-
- int numberOfAdds = 0;
-
- int numberOfDeletes = 0;
-
- int numberOfUpdates = 0;
-
- long expectedRecords = 0;
-
- Exception ex = null;
-
- long lastID = 0;
-
- public Loader(final long expectedRecords)
- {
- this.expectedRecords = expectedRecords;
- }
-
- public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction)
- {
- numberOfPreparedTransactions++;
-
- }
-
- public void addRecord(final RecordInfo info)
- {
- if (info.id == lastID)
- {
- System.out.println("id = " + info.id + " last id = " + lastID);
- }
-
- ByteBuffer buffer = ByteBuffer.wrap(info.data);
- long recordValue = buffer.getLong();
-
- if (recordValue != info.id)
- {
- ex = new Exception("Content not as expected (" + recordValue + " != " + info.id + ")");
-
- }
-
- lastID = info.id;
- numberOfAdds++;
-
- }
-
- public void deleteRecord(final long id)
- {
- numberOfDeletes++;
-
- }
-
- public void updateRecord(final RecordInfo info)
- {
- numberOfUpdates++;
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.TransactionFailureCallback#failedTransaction(long, java.util.List, java.util.List)
- */
- public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
- {
- }
-
- }
-
-}
16 years, 2 months
JBoss hornetq SVN: r7999 - in trunk: src/config and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-09-28 11:07:36 -0400 (Mon, 28 Sep 2009)
New Revision: 7999
Modified:
trunk/docs/user-manual/en/appserver-integration.xml
trunk/src/config/ra.xml
Log:
removed transaction manager refrences in RA adapter
Modified: trunk/docs/user-manual/en/appserver-integration.xml
===================================================================
--- trunk/docs/user-manual/en/appserver-integration.xml 2009-09-28 11:00:38 UTC (rev 7998)
+++ trunk/docs/user-manual/en/appserver-integration.xml 2009-09-28 15:07:36 UTC (rev 7999)
@@ -383,17 +383,6 @@
key=val;key=val; and will be specific to the connector used</entry>
</row>
<row>
- <entry>TransactionManagerLocatorClass</entry>
- <entry>String</entry>
- <entry>The class to use to load the transaction manager</entry>
- </row>
- <row>
- <entry>TransactionManagerLocatorMethod</entry>
- <entry>String</entry>
- <entry>The method to invoke on the TransactionManagerLocatorClass to get
- the transaction manager</entry>
- </row>
- <row>
<entry>useLocalTx</entry>
<entry>boolean</entry>
<entry>True will enable local transaction optimisation.</entry>
Modified: trunk/src/config/ra.xml
===================================================================
--- trunk/src/config/ra.xml 2009-09-28 11:00:38 UTC (rev 7998)
+++ trunk/src/config/ra.xml 2009-09-28 15:07:36 UTC (rev 7999)
@@ -53,12 +53,6 @@
</config-property>
<!--
<config-property>
- <description>The class to use for locatingthe transactionmanager</description>
- <config-property-name>TransactionManagerLocatorClass</config-property-name>
- <config-property-type>java.lang.String</config-property-type>
- <config-property-value>org.hornetq.ra.inflow.JBoss5TransactionManagerLocator</config-property-value>
- </config-property>
- <config-property>
<description>The method to use for locatingthe transactionmanager</description>
<config-property-name>TransactionManagerLocatorMethod</config-property-name>
<config-property-type>java.lang.String</config-property-type>
16 years, 2 months
JBoss hornetq SVN: r7998 - in trunk/src/main/org/hornetq/ra: inflow and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-09-28 07:00:38 -0400 (Mon, 28 Sep 2009)
New Revision: 7998
Modified:
trunk/src/main/org/hornetq/ra/HornetQRAProperties.java
trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java
trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
Log:
removed transaction manager refrences in RA adapter
Modified: trunk/src/main/org/hornetq/ra/HornetQRAProperties.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAProperties.java 2009-09-28 08:59:11 UTC (rev 7997)
+++ trunk/src/main/org/hornetq/ra/HornetQRAProperties.java 2009-09-28 11:00:38 UTC (rev 7998)
@@ -47,10 +47,6 @@
/** Use Local TX instead of XA */
private Boolean localTx = false;
- private String transactionManagerLocatorClass = "org.hornetq.integration.jboss.tm.JBoss5TransactionManagerLocator";
-
- private String transactionManagerLocatorMethod = "getTm";
-
/**
* Constructor
*/
@@ -189,24 +185,4 @@
return useXA != null && useXA;
}
-
- public void setTransactionManagerLocatorClass(String transactionManagerLocatorClass)
- {
- this.transactionManagerLocatorClass = transactionManagerLocatorClass;
- }
-
- public String getTransactionManagerLocatorClass()
- {
- return transactionManagerLocatorClass;
- }
-
- public void setTransactionManagerLocatorMethod(String transactionManagerLocatorMethod)
- {
- this.transactionManagerLocatorMethod = transactionManagerLocatorMethod;
- }
-
- public String getTransactionManagerLocatorMethod()
- {
- return transactionManagerLocatorMethod;
- }
}
Modified: trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2009-09-28 08:59:11 UTC (rev 7997)
+++ trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2009-09-28 11:00:38 UTC (rev 7998)
@@ -212,36 +212,6 @@
log.info("HornetQ resource adapter stopped");
}
- public void setTransactionManagerLocatorClass(final String transactionManagerLocatorClass)
- {
- if (trace)
- {
- log.trace("setTransactionManagerLocatorClass(" + transactionManagerLocatorClass + ")");
- }
-
- raProperties.setTransactionManagerLocatorClass(transactionManagerLocatorClass);
- }
-
- public String getTransactionManagerLocatorClass()
- {
- return raProperties.getTransactionManagerLocatorClass();
- }
-
- public void setTransactionManagerLocatorMethod(final String transactionManagerLocatorMethod)
- {
- if (trace)
- {
- log.trace("setTransactionManagerLocatorMethod(" + transactionManagerLocatorMethod + ")");
- }
-
- raProperties.setTransactionManagerLocatorMethod(transactionManagerLocatorMethod);
- }
-
- public String getTransactionManagerLocatorMethod()
- {
- return raProperties.getTransactionManagerLocatorMethod();
- }
-
public void setConnectorClassName(final String connectorClassName)
{
if (trace)
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2009-09-28 08:59:11 UTC (rev 7997)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2009-09-28 11:00:38 UTC (rev 7998)
@@ -97,11 +97,6 @@
private HornetQDestination destination;
- /**
- * The TransactionManager
- */
- private TransactionManager tm;
-
private final List<HornetQMessageHandler> handlers = new ArrayList<HornetQMessageHandler>();
private org.hornetq.jms.client.HornetQConnectionFactory factory;
@@ -209,37 +204,6 @@
}
/**
- * Get the transaction manager
- *
- * @return The value
- */
- public TransactionManager getTransactionManager()
- {
- if (trace)
- {
- log.trace("getTransactionManager()");
- }
-
- if (tm == null)
- {
- try
- {
- ClassLoader loader = Thread.currentThread().getContextClassLoader();
- Class aClass = loader.loadClass(ra.getTransactionManagerLocatorClass());
- Object o = aClass.newInstance();
- Method m = aClass.getMethod(ra.getTransactionManagerLocatorMethod());
- tm = (TransactionManager) m.invoke(o);
- }
- catch (Exception e)
- {
- log.warn("unable to create TransactionManager from " + ra.getTransactionManagerLocatorClass() + "." + ra.getTransactionManagerLocatorMethod());
- }
- }
-
- return tm;
- }
-
- /**
* Is the destination a topic
*
* @return The value
16 years, 2 months