Author: igarashitm
Date: 2011-12-01 15:07:29 -0500 (Thu, 01 Dec 2011)
New Revision: 11807
Added:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/impl/ReplicationError.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupAuthenticationTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java
Removed:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/RemoteFailoverTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagedFailverOverTest.java
Modified:
branches/HORNETQ-316/
branches/HORNETQ-316/etc/org.eclipse.jdt.core.prefs
branches/HORNETQ-316/etc/org.eclipse.jdt.ui.prefs
branches/HORNETQ-316/hornetq-commons/src/main/java/org/hornetq/api/core/HornetQException.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/TransportConfiguration.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/ClusterConnectionConfiguration.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/Channel.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/ClusterConnection.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-316/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
branches/HORNETQ-316/hornetq-jms/src/main/java/org/hornetq/jms/persistence/JMSStorageManager.java
branches/HORNETQ-316/hornetq-jms/src/main/java/org/hornetq/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java
branches/HORNETQ-316/hornetq-jms/src/main/java/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java
branches/HORNETQ-316/hornetq-jms/src/main/java/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
branches/HORNETQ-316/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
branches/HORNETQ-316/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
branches/HORNETQ-316/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
branches/HORNETQ-316/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/JMSTest.java
branches/HORNETQ-316/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/SelectorTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyFailoverTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedAsynchronousFailoverTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageFailoverTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageWithDelayFailoverTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/AcceptorControlTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/AcceptorControlUsingCoreTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/AddressControlTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/HORNETQ-316/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
Log:
svn merge -r r11685:r11800
https://svn.jboss.org/repos/hornetq/trunk
(resolve some conflict by hand)
Property changes on: branches/HORNETQ-316
___________________________________________________________________
Modified: svn:mergeinfo
- /branches/HORNETQ-720_Replication:10878-11528
/branches/STOMP11:11225-11517
+ /branches/HORNETQ-720_Replication:10878-11528
/branches/STOMP11:11225-11517
/trunk:11696-11800
Modified: branches/HORNETQ-316/etc/org.eclipse.jdt.core.prefs
===================================================================
--- branches/HORNETQ-316/etc/org.eclipse.jdt.core.prefs 2011-12-01 18:03:10 UTC (rev
11806)
+++ branches/HORNETQ-316/etc/org.eclipse.jdt.core.prefs 2011-12-01 20:07:29 UTC (rev
11807)
@@ -1,4 +1,4 @@
-#Tue Jul 05 17:04:57 CEST 2011
+#Tue Nov 08 17:31:37 CET 2011
eclipse.preferences.version=1
org.eclipse.jdt.core.codeComplete.argumentPrefixes=
org.eclipse.jdt.core.codeComplete.argumentSuffixes=
@@ -34,16 +34,18 @@
org.eclipse.jdt.core.formatter.alignment_for_conditional_expression=18
org.eclipse.jdt.core.formatter.alignment_for_enum_constants=82
org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer=82
-org.eclipse.jdt.core.formatter.alignment_for_method_declaration=32
+org.eclipse.jdt.core.formatter.alignment_for_method_declaration=16
org.eclipse.jdt.core.formatter.alignment_for_multiple_fields=16
org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration=18
-org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration=82
+org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration=18
+org.eclipse.jdt.core.formatter.alignment_for_resources_in_try=80
org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation=82
org.eclipse.jdt.core.formatter.alignment_for_superclass_in_type_declaration=16
org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration=82
org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration=16
org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration=82
-org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration=82
+org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration=18
+org.eclipse.jdt.core.formatter.alignment_for_union_type_in_multicatch=16
org.eclipse.jdt.core.formatter.blank_lines_after_imports=1
org.eclipse.jdt.core.formatter.blank_lines_after_package=1
org.eclipse.jdt.core.formatter.blank_lines_before_field=0
@@ -175,12 +177,14 @@
org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_parenthesized_expression=do
not insert
org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_switch=do not insert
org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_synchronized=do not
insert
+org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_try=do not insert
org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_while=do not insert
org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator=do not insert
org.eclipse.jdt.core.formatter.insert_space_after_prefix_operator=do not insert
org.eclipse.jdt.core.formatter.insert_space_after_question_in_conditional=insert
org.eclipse.jdt.core.formatter.insert_space_after_question_in_wildcard=do not insert
org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_for=insert
+org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_try_resources=insert
org.eclipse.jdt.core.formatter.insert_space_after_unary_operator=do not insert
org.eclipse.jdt.core.formatter.insert_space_before_and_in_type_parameter=insert
org.eclipse.jdt.core.formatter.insert_space_before_assignment_operator=insert
@@ -204,6 +208,7 @@
org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_parenthesized_expression=do
not insert
org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_switch=do not insert
org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_synchronized=do not
insert
+org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_try=do not insert
org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_while=do not insert
org.eclipse.jdt.core.formatter.insert_space_before_colon_in_assert=insert
org.eclipse.jdt.core.formatter.insert_space_before_colon_in_case=do not insert
@@ -259,6 +264,7 @@
org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_parenthesized_expression=do
not insert
org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch=insert
org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized=insert
+org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_try=insert
org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while=insert
org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return=insert
org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw=insert
@@ -268,6 +274,7 @@
org.eclipse.jdt.core.formatter.insert_space_before_question_in_wildcard=do not insert
org.eclipse.jdt.core.formatter.insert_space_before_semicolon=do not insert
org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_for=do not insert
+org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_try_resources=do not
insert
org.eclipse.jdt.core.formatter.insert_space_before_unary_operator=do not insert
org.eclipse.jdt.core.formatter.insert_space_between_brackets_in_array_type_reference=do
not insert
org.eclipse.jdt.core.formatter.insert_space_between_empty_braces_in_array_initializer=do
not insert
@@ -294,4 +301,5 @@
org.eclipse.jdt.core.formatter.use_on_off_tags=true
org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations=false
org.eclipse.jdt.core.formatter.wrap_before_binary_operator=false
+org.eclipse.jdt.core.formatter.wrap_before_or_operator_multicatch=true
org.eclipse.jdt.core.formatter.wrap_outer_expressions_when_nested=true
Modified: branches/HORNETQ-316/etc/org.eclipse.jdt.ui.prefs
===================================================================
--- branches/HORNETQ-316/etc/org.eclipse.jdt.ui.prefs 2011-12-01 18:03:10 UTC (rev 11806)
+++ branches/HORNETQ-316/etc/org.eclipse.jdt.ui.prefs 2011-12-01 20:07:29 UTC (rev 11807)
@@ -1,4 +1,4 @@
-#Tue Jun 28 17:14:52 CEST 2011
+#Thu Nov 10 13:53:54 CET 2011
cleanup.add_default_serial_version_id=false
cleanup.add_generated_serial_version_id=true
cleanup.add_missing_annotations=true
@@ -64,6 +64,7 @@
org.eclipse.jdt.ui.ondemandthreshold=9999
org.eclipse.jdt.ui.overrideannotation=true
org.eclipse.jdt.ui.staticondemandthreshold=9999
+org.eclipse.jdt.ui.text.custom_code_templates=<?xml version\="1.0"
encoding\="UTF-8" standalone\="no"?><templates><template
autoinsert\="true" context\="gettercomment_context"
deleted\="false" description\="Comment for getter method"
enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.gettercomment"
name\="gettercomment">/**\n * @return the ${bare_field_name}\n
*/</template><template autoinsert\="true"
context\="settercomment_context" deleted\="false"
description\="Comment for setter method" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.settercomment"
name\="settercomment">/**\n * @param ${param} the ${bare_field_name} to set\n
*/</template><template autoinsert\="true"
context\="constructorcomment_context" deleted\="false"
description\="Comment for created constructors" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.constructorcomment"
name\="constructorcomment">/**\n * ${tags}\n */</template><template
autoinsert\="true" context\="filecomment!
_context" deleted\="false" description\="Comment for created Java
files" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.filecomment"
name\="filecomment">/**\n * \n */</template><template
autoinsert\="false" context\="typecomment_context"
deleted\="false" description\="Comment for created types"
enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.typecomment"
name\="typecomment">/**\n * ${tags}\n */</template><template
autoinsert\="true" context\="fieldcomment_context"
deleted\="false" description\="Comment for fields"
enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.fieldcomment"
name\="fieldcomment">/**\n * \n */</template><template
autoinsert\="true" context\="methodcomment_context"
deleted\="false" description\="Comment for non-overriding methods"
enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.methodcomment"
name\="methodcomment">/**\n * ${tags}\n */</template><template
autoinsert\="false" context\="overridecomment_context" delete!
d\="false" description\="Comment for overriding methods" enabl!
ed\="tru
e" id\="org.eclipse.jdt.ui.text.codetemplates.overridecomment"
name\="overridecomment"/><template autoinsert\="true"
context\="delegatecomment_context" deleted\="false"
description\="Comment for delegate methods" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.delegatecomment"
name\="delegatecomment">/**\n * ${tags}\n * ${see_to_target}\n
*/</template><template autoinsert\="true"
context\="newtype_context" deleted\="false" description\="Newly
created files" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.newtype"
name\="newtype">${filecomment}\n${package_declaration}\n\n${typecomment}\n${type_declaration}</template><template
autoinsert\="true" context\="classbody_context"
deleted\="false" description\="Code in new class type bodies"
enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.classbody"
name\="classbody">\n</template><template
autoinsert\="true" context\="interfacebody_context"
deleted\="false" description\="Code in new interface type bod!
ies" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.interfacebody"
name\="interfacebody">\n</template><template
autoinsert\="true" context\="enumbody_context"
deleted\="false" description\="Code in new enum type bodies"
enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.enumbody"
name\="enumbody">\n</template><template autoinsert\="true"
context\="annotationbody_context" deleted\="false"
description\="Code in new annotation type bodies" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.annotationbody"
name\="annotationbody">\n</template><template
autoinsert\="true" context\="catchblock_context"
deleted\="false" description\="Code in new catch blocks"
enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.catchblock"
name\="catchblock">// ${todo} Auto-generated catch
block\n${exception_var}.printStackTrace();</template><template
autoinsert\="true" context\="methodbody_context"
deleted\="false" description\="Code in created method stubs" enabl!
ed\="true" id\="org.eclipse.jdt.ui.text.codetemplates.methodbo!
dy" name
\="methodbody">// ${todo} Auto-generated method
stub\n${body_statement}</template><template autoinsert\="true"
context\="constructorbody_context" deleted\="false"
description\="Code in created constructor stubs" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.constructorbody"
name\="constructorbody">${body_statement}\n// ${todo} Auto-generated
constructor stub</template><template autoinsert\="true"
context\="getterbody_context" deleted\="false" description\="Code
in created getters" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.getterbody"
name\="getterbody">return ${field};</template><template
autoinsert\="true" context\="setterbody_context"
deleted\="false" description\="Code in created setters"
enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.setterbody"
name\="setterbody">${field} \= ${param};</template></templates>
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
Modified:
branches/HORNETQ-316/hornetq-commons/src/main/java/org/hornetq/api/core/HornetQException.java
===================================================================
---
branches/HORNETQ-316/hornetq-commons/src/main/java/org/hornetq/api/core/HornetQException.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-commons/src/main/java/org/hornetq/api/core/HornetQException.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -136,11 +136,11 @@
/**
- * A Session Metadata was set in duplication
+ * A Session Metadata was set in duplication
*/
public static final int DUPLICATE_METADATA = 114;
-
+
// Native Error codes ----------------------------------------------
/**
@@ -193,6 +193,8 @@
*/
public static final int NATIVE_ERROR_AIO_FULL = 211;
+ public static final int ALREADY_REPLICATING = 212;
+
private int code;
public HornetQException()
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/TransportConfiguration.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/TransportConfiguration.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/api/core/TransportConfiguration.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -19,19 +19,21 @@
import org.hornetq.utils.UUIDGenerator;
/**
- * A TransportConfiguration is used by a client to specify a connections to a server and
its backup if one exists.<br><br>
+ * A TransportConfiguration is used by a client to specify connections to a server and
its backup if
+ * one exists.
+ * <p>
+ * Typically the constructors take the class name and parameters for needed to create
the
+ * connection. These will be different dependent on which connector is being used, i.e.
Netty or
+ * InVM etc. For example:
* <p/>
- * Typically the constructors take the class name and parameters for needed to create the
connection. These will be
- * different dependent on which connector is being used, i.e. Netty or InVM etc. For
example:<br><br>
- * <p/>
- * <code>
- * HashMap<String, Object> map = new HashMap<String, Object>();<br>
- * map.put("host", "localhost");<br>
- * map.put("port", 5445);<br>
- * TransportConfiguration config = new
TransportConfiguration(InVMConnectorFactory.class.getName(), map); <br>
- * ClientSessionFactory sf = new ClientSessionFactoryImpl(config); <br>
- * </code><br><br>
- *
+ *
+ * <pre>
+ * HashMap<String, Object> map = new HashMap<String,
Object>();
+ * map.put("host", "localhost");
+ * map.put("port", 5445);
+ * TransportConfiguration config = new
TransportConfiguration(InVMConnectorFactory.class.getName(), map);
+ * ClientSessionFactory sf = new ClientSessionFactoryImpl(config);
+ * </pre>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*/
public class TransportConfiguration implements Serializable
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -50,8 +50,6 @@
// Attributes
-----------------------------------------------------------------------------------
- private final boolean trace = ClientProducerImpl.log.isTraceEnabled();
-
private final SimpleString address;
private final ClientSessionInternal session;
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -81,8 +81,6 @@
// Constants
//
------------------------------------------------------------------------------------
- private static final long serialVersionUID = 2512460695662741413L;
-
private static final Logger log = Logger.getLogger(ClientSessionFactoryImpl.class);
private static final boolean isTrace = ClientSessionFactoryImpl.log.isTraceEnabled();
@@ -1410,7 +1408,7 @@
}
}
- private class Channel0Handler implements ChannelHandler
+ private final class Channel0Handler implements ChannelHandler
{
private final CoreRemotingConnection conn;
@@ -1500,9 +1498,12 @@
serverLocator.notifyNodeUp(topMessage.getUniqueEventID(),
topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
}
}
+ else if (type == PacketImpl.BACKUP_REGISTRATION_FAILED)
+ {
+ // no-op
+ }
}
-
}
public class CloseRunnable implements Runnable
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -109,8 +109,6 @@
private static final Logger log = Logger.getLogger(ClientSessionImpl.class);
- private final boolean trace = ClientSessionImpl.log.isTraceEnabled();
-
// Attributes
----------------------------------------------------------------------------
private final Map<String, String> metadata = new HashMap<String,
String>();
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/ClusterConnectionConfiguration.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -26,10 +26,6 @@
* A ClusterConnectionConfiguration
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- * Created 13 Jan 2009 09:42:17
- *
- *
*/
public class ClusterConnectionConfiguration implements Serializable
{
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -87,8 +87,6 @@
void pageWrite(PagedMessage message, int pageNumber);
- boolean isReplicated();
-
void afterCompleteOperations(IOAsyncTask run);
/** Block until the operations are done.
@@ -269,4 +267,11 @@
ServerMessage message,
RoutingContext ctx,
RouteContextList listCtx) throws Exception;
+
+ /**
+ * Stops the replication of data from the live to the backup.
+ * <p>
+ * Typical scenario is a broken connection.
+ */
+ void stopReplication();
}
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -17,6 +17,7 @@
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.security.AccessController;
+import java.security.InvalidParameterException;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
@@ -107,13 +108,17 @@
import org.hornetq.utils.XidCodecSupport;
/**
- *
- * A JournalStorageManager
- *
+ * Controls access to the journals and other storage files such as the ones used to store
pages and
+ * large messages. This class must control writing of any non-transient data, as it is
the key point
+ * for synchronizing a replicating backup server.
+ * <p>
+ * Notice that, turning on and off replication (on the live server side) is _mostly_ a
matter of
+ * using {@link ReplicatedJournal}s instead of regular {@link JournalImpl}, and sync the
existing
+ * data.
+ * <p>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
- *
*/
public class JournalStorageManager implements StorageManager
{
@@ -137,7 +142,7 @@
// Message journal record types
// This is used when a large message is created but not yet stored on the system.
- // We use this to avoid temporary files missing
+ // Used to avoid temporary files missing
public static final byte ADD_LARGE_MESSAGE_PENDING = 29;
private static final byte ADD_LARGE_MESSAGE = 30;
@@ -185,13 +190,15 @@
return MESSAGES;
if (BINDINGS.typeByte == type)
return BINDINGS;
- throw new RuntimeException("invalid byte");
+ throw new InvalidParameterException("invalid byte: " + type);
}
}
- private Journal messageJournal;
- private Journal bindingsJournal;
- private final SequentialFileFactory largeMessagesFactory;
+ private Journal messageJournal;
+ private Journal bindingsJournal;
+ private final Journal originalMessageJournal;
+ private final Journal originalBindingsJournal;
+ private final SequentialFileFactory largeMessagesFactory;
private volatile boolean started;
@@ -230,18 +237,10 @@
public JournalStorageManager(final Configuration config, final ExecutorFactory
executorFactory,
final IOCriticalErrorListener criticalErrorListener)
{
- this(config, executorFactory, null, criticalErrorListener);
- }
-
- public JournalStorageManager(final Configuration config, final ExecutorFactory
executorFactory,
- final ReplicationManager replicator, final
IOCriticalErrorListener criticalErrorListener)
- {
this.executorFactory = executorFactory;
executor = executorFactory.getExecutor();
- this.replicator = replicator;
-
if (config.getJournalType() != JournalType.NIO && config.getJournalType()
!= JournalType.ASYNCIO)
{
throw new IllegalArgumentException("Only NIO and AsyncIO are supported
journals");
@@ -269,19 +268,13 @@
"bindings",
1);
- if (replicator != null)
- {
- bindingsJournal = new ReplicatedJournal((byte)0, localBindings,
replicator);
- }
- else
- {
- bindingsJournal = localBindings;
- }
+ bindingsJournal = localBindings;
+ originalBindingsJournal = localBindings;
- if (journalDir == null)
- {
- throw new NullPointerException("journal-dir is null");
- }
+ if (journalDir == null)
+ {
+ throw new NullPointerException("journal-dir is null");
+ }
createJournalDir = config.isCreateJournalDir();
@@ -327,14 +320,8 @@
config.getJournalType() ==
JournalType.ASYNCIO ? config.getJournalMaxIO_AIO()
: config.getJournalMaxIO_NIO());
- if (replicator != null)
- {
- messageJournal = new ReplicatedJournal((byte)1, localMessage, replicator);
- }
- else
- {
- messageJournal = localMessage;
- }
+ messageJournal = localMessage;
+ originalMessageJournal = localMessage;
largeMessagesDirectory = config.getLargeMessagesDirectory();
largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory, false,
criticalErrorListener);
@@ -375,8 +362,8 @@
JournalFile[] messageFiles = null;
JournalFile[] bindingsFiles = null;
- final Journal localMessageJournal = messageJournal;
- final Journal localBindingsJournal = bindingsJournal;
+ try
+ {
Map<String, Long> largeMessageFilesToSync;
Map<SimpleString, Collection<Integer>> pageFilesToSync;
@@ -384,15 +371,15 @@
try
{
replicator = replicationManager;
- localMessageJournal.synchronizationLock();
- localBindingsJournal.synchronizationLock();
+ originalMessageJournal.synchronizationLock();
+ originalBindingsJournal.synchronizationLock();
try
{
pagingManager.lock();
try
{
- messageFiles = prepareJournalForCopy(localMessageJournal,
JournalContent.MESSAGES);
- bindingsFiles = prepareJournalForCopy(localBindingsJournal,
JournalContent.BINDINGS);
+ messageFiles = prepareJournalForCopy(originalMessageJournal,
JournalContent.MESSAGES, nodeID);
+ bindingsFiles = prepareJournalForCopy(originalBindingsJournal,
JournalContent.BINDINGS, nodeID);
pageFilesToSync = getPageInformationForSync(pagingManager);
largeMessageFilesToSync = getLargeMessageInformation();
}
@@ -403,11 +390,11 @@
}
finally
{
- localMessageJournal.synchronizationUnlock();
- localBindingsJournal.synchronizationUnlock();
+ originalMessageJournal.synchronizationUnlock();
+ originalBindingsJournal.synchronizationUnlock();
}
- bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal,
replicator);
- messageJournal = new ReplicatedJournal((byte)1, localMessageJournal,
replicator);
+ bindingsJournal = new ReplicatedJournal(((byte)0), originalBindingsJournal,
replicator);
+ messageJournal = new ReplicatedJournal((byte)1, originalMessageJournal,
replicator);
}
finally
{
@@ -428,11 +415,36 @@
}
finally
{
+ storageManagerLock.writeLock().unlock();
+ }
+ }
+ catch (Exception e)
+ {
+ stopReplication();
+ throw e;
+ }
+ }
+
+ /**
+ * Stops replication by resetting replication-related fields to their
'unreplicated' state.
+ */
+ @Override
+ public void stopReplication()
+ {
+
+ storageManagerLock.writeLock().lock();
+ try
+ {
+ bindingsJournal = originalBindingsJournal;
+ messageJournal = originalMessageJournal;
+ replicator = null;
+ }
+ finally
+ {
storageManagerLock.writeLock().unlock();
}
}
-
/**
* @param pageFilesToSync
* @throws Exception
@@ -522,15 +534,17 @@
}
}
- private JournalFile[] prepareJournalForCopy(Journal journal, JournalContent
contentType) throws Exception
+ private JournalFile[]
+ prepareJournalForCopy(Journal journal, JournalContent contentType, String
nodeID) throws Exception
{
- journal.forceMoveNextFile();
- JournalFile[] datafiles = journal.getDataFiles();
- replicator.sendStartSyncMessage(datafiles, contentType);
- return datafiles;
+ journal.forceMoveNextFile();
+ JournalFile[] datafiles = journal.getDataFiles();
+ replicator.sendStartSyncMessage(datafiles, contentType, nodeID);
+ return datafiles;
}
- public void waitOnOperations() throws Exception
+ @Override
+ public void waitOnOperations() throws Exception
{
if (!started)
{
@@ -540,9 +554,7 @@
waitOnOperations(0);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#blockOnReplication()
- */
+ @Override
public boolean waitOnOperations(final long timeout) throws Exception
{
if (!started)
@@ -553,46 +565,59 @@
return getContext().waitCompletion(timeout);
}
- /*
- *
- * (non-Javadoc)
- * @see
org.hornetq.core.persistence.StorageManager#pageClosed(org.hornetq.utils.SimpleString,
int)
- */
- public void pageClosed(final SimpleString storeName, final int pageNumber)
+ @Override
+ public void pageClosed(final SimpleString storeName, final int pageNumber)
{
if (isReplicated())
- {
- replicator.pageClosed(storeName, pageNumber);
- }
+ {
+ readLock();
+ try
+ {
+ replicator.pageClosed(storeName, pageNumber);
+ }
+ finally
+ {
+ readUnLock();
+ }
+ }
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.persistence.StorageManager#pageDeleted(org.hornetq.utils.SimpleString,
int)
- */
- public void pageDeleted(final SimpleString storeName, final int pageNumber)
- {
- if (isReplicated())
- {
- replicator.pageDeleted(storeName, pageNumber);
- }
- }
+ @Override
+ public void pageDeleted(final SimpleString storeName, final int pageNumber)
+ {
+ if (isReplicated())
+ {
+ readLock();
+ try
+ {
+ replicator.pageDeleted(storeName, pageNumber);
+ }
+ finally
+ {
+ readUnLock();
+ }
+ }
+ }
- /*
- * (non-Javadoc)
- * @see
org.hornetq.core.persistence.StorageManager#pageWrite(org.hornetq.utils.SimpleString,
- * int, org.hornetq.api.core.buffers.ChannelBuffer)
- */
+ @Override
public void pageWrite(final PagedMessage message, final int pageNumber)
{
if (isReplicated())
{
- replicator.pageWrite(message, pageNumber);
+ if (!message.getMessage().isDurable())
+ return;
+ readLock();
+ try
+ {
+ replicator.pageWrite(message, pageNumber);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#getContext()
- */
public OperationContext getContext()
{
return OperationContextImpl.getContext(executorFactory);
@@ -613,9 +638,6 @@
return newContext(singleThreadExecutor);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#newContext()
- */
public OperationContext newContext(final Executor executor)
{
return new OperationContextImpl(executor);
@@ -4061,16 +4083,10 @@
{
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
- */
public void beforeRollback(Transaction tx) throws Exception
{
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
- */
public void afterRollback(Transaction tx)
{
for (Long msg : confirmedMessages)
@@ -4099,4 +4115,5 @@
}
+
}
\ No newline at end of file
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -317,46 +317,23 @@
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#loadInternalOnly()
- */
public JournalLoadInformation[] loadInternalOnly() throws Exception
{
return null;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#isReplicated()
- */
- public boolean isReplicated()
- {
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#completeReplication()
- */
public void completeOperations()
{
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.persistence.StorageManager#pageClosed(org.hornetq.utils.SimpleString,
int)
- */
public void pageClosed(final SimpleString storeName, final int pageNumber)
{
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.persistence.StorageManager#pageDeleted(org.hornetq.utils.SimpleString,
int)
- */
public void pageDeleted(final SimpleString storeName, final int pageNumber)
{
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.persistence.StorageManager#pageWrite(org.hornetq.core.paging.PagedMessage,
int)
- */
public void pageWrite(final PagedMessage message, final int pageNumber)
{
}
@@ -369,9 +346,6 @@
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#blockOnReplication(long)
- */
public boolean waitOnOperations(final long timeout) throws Exception
{
return true;
@@ -618,4 +592,10 @@
{
return false;
}
+
+ @Override
+ public void stopReplication()
+ {
+ // no-op
+ }
}
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/Channel.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/Channel.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/Channel.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -36,7 +36,7 @@
* @return the id
*/
long getID();
-
+
/** For protocol check */
boolean supports(byte packetID);
@@ -46,7 +46,7 @@
* @param packet the packet to send
*/
void send(Packet packet);
-
+
/**
* sends a packet on this channel using batching algorithm if appropriate
*
@@ -71,8 +71,8 @@
Packet sendBlocking(Packet packet) throws HornetQException;
/**
- * sets the {@link org.hornetq.core.protocol.core.ChannelHandler} that this channel
should forward received packets to.
- *
+ * Sets the {@link org.hornetq.core.protocol.core.ChannelHandler} that this channel
should
+ * forward received packets to.
* @param handler the handler
*/
void setHandler(ChannelHandler handler);
@@ -134,7 +134,7 @@
Lock getLock();
/**
- * returns the Remoting Connection being used by the channel
+ * returns the {@link CoreRemotingConnection} being used by the channel
*/
CoreRemotingConnection getConnection();
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -13,6 +13,7 @@
package org.hornetq.core.protocol.core.impl;
+import java.util.EnumSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@@ -59,10 +60,18 @@
{
this.id = id;
}
+
+ protected static String idToString(long code)
+ {
+ for (CHANNEL_ID channel:EnumSet.allOf(CHANNEL_ID.class)){
+ if (channel.id==code) return channel.toString();
+ }
+ return Long.toString(code);
+ }
}
private static final Logger log = Logger.getLogger(ChannelImpl.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
private volatile long id;
@@ -118,11 +127,11 @@
resendCache = null;
}
}
-
+
public boolean supports(final byte packetType)
{
int version = connection.getClientVersion();
-
+
switch (packetType)
{
case PacketImpl.CLUSTER_TOPOLOGY_V2:
@@ -198,7 +207,7 @@
synchronized (sendLock)
{
packet.setChannelID(id);
-
+
if (isTrace)
{
log.trace("Sending packet nonblocking " + packet + " on
channeID=" + id);
@@ -237,7 +246,7 @@
{
lock.unlock();
}
-
+
if (isTrace)
{
log.trace("Writing buffer for channelID=" + id);
@@ -351,7 +360,10 @@
{
if (confWindowSize < 0)
{
- throw new IllegalStateException("You can't set confirmationHandler on a
connection with confirmation-window-size < 0. Look at the documentation for more
information.");
+ final String msg =
+ "You can't set confirmationHandler on a connection with
confirmation-window-size < 0."
+ + " Look at the documentation for more
information.";
+ throw new IllegalStateException(msg);
}
commandConfirmationHandler = handler;
}
@@ -575,4 +587,10 @@
firstStoredCommandID += numberToClear;
}
+
+ @Override
+ public String toString()
+ {
+ return "Channel[id=" + CHANNEL_ID.idToString(id) + ", handler="
+ handler + "]";
+ }
}
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -19,6 +19,7 @@
import java.util.concurrent.Executor;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
@@ -32,6 +33,7 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationFailedMessage;
import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
@@ -41,6 +43,7 @@
import
org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -208,10 +211,23 @@
} else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
{
BackupRegistrationMessage msg = (BackupRegistrationMessage)packet;
- if (server.startReplication(rc, acceptorUsed.getClusterConnection(),
getPair(msg.getConnector(), true)))
+ ClusterConnection clusterConnection =
acceptorUsed.getClusterConnection();
+
+ if (clusterConnection.verify(msg.getClusterUser(),
msg.getClusterPassword()))
{
- // XXX if it fails, the backup should get to know it
+ try
+ {
+ server.startReplication(rc, clusterConnection,
getPair(msg.getConnector(), true));
+ }
+ catch (HornetQException e)
+ {
+ channel0.send(new BackupRegistrationFailedMessage(e));
+ }
}
+ else
+ {
+ channel0.send(new BackupRegistrationFailedMessage(null));
+ }
}
}
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -88,6 +88,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationFailedMessage;
import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
@@ -547,6 +548,11 @@
packet = new BackupRegistrationMessage();
break;
}
+ case PacketImpl.BACKUP_REGISTRATION_FAILED:
+ {
+ packet = new BackupRegistrationFailedMessage();
+ break;
+ }
case PacketImpl.REPLICATION_START_FINISH_SYNC:
{
packet = new ReplicationStartSyncMessage();
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -147,7 +147,7 @@
public static final byte SESS_PRODUCER_REQUEST_CREDITS = 79;
public static final byte SESS_PRODUCER_CREDITS = 80;
-
+
public static final byte SESS_INDIVIDUAL_ACKNOWLEDGE = 81;
// Replication
@@ -179,15 +179,15 @@
public static final byte REPLICATION_COMPARE_DATA = 102;
public static final byte REPLICATION_SYNC_FILE = 103;
-
+
public static final byte SESS_ADD_METADATA = 104;
-
+
public static final byte SESS_ADD_METADATA2 = 105;
-
+
public static final byte SESS_UNIQUE_ADD_METADATA = 106;
-
-
+
+
// HA
public static final byte CLUSTER_TOPOLOGY = 110;
@@ -195,7 +195,7 @@
public static final byte NODE_ANNOUNCE = 111;
public static final byte SUBSCRIBE_TOPOLOGY = 112;
-
+
// For newer versions
public static final byte SUBSCRIBE_TOPOLOGY_V2 = 113;
@@ -203,9 +203,10 @@
public static final byte CLUSTER_TOPOLOGY_V2 = 114;
public static final byte BACKUP_REGISTRATION = 115;
+ public static final byte BACKUP_REGISTRATION_FAILED = 116;
public static final byte REPLICATION_START_FINISH_SYNC = 120;
-
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
@@ -288,7 +289,7 @@
{
return true;
}
-
+
public boolean isAsyncExec()
{
return false;
Copied:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java
(from rev 11800,
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java)
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java
(rev 0)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -0,0 +1,82 @@
+/**
+ *
+ */
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Informs the Backup trying to start replicating of an error.
+ */
+public final class BackupRegistrationFailedMessage extends PacketImpl
+{
+
+ enum BackupRegistrationProblem
+ {
+ EXCEPTION(0), AUTHENTICATION(1);
+ final int code;
+
+ private BackupRegistrationProblem(int code)
+ {
+ this.code = code;
+ }
+ }
+
+ int errorCode = -1;
+ BackupRegistrationProblem problem;
+
+ public BackupRegistrationFailedMessage(HornetQException e)
+ {
+ super(BACKUP_REGISTRATION_FAILED);
+ if (e != null)
+ {
+ errorCode = e.getCode();
+ problem = BackupRegistrationProblem.EXCEPTION;
+ }
+ else
+ {
+ problem = BackupRegistrationProblem.AUTHENTICATION;
+ }
+ }
+
+ public BackupRegistrationFailedMessage()
+ {
+ super(BACKUP_REGISTRATION_FAILED);
+ }
+
+ public int getCause()
+ {
+ return errorCode;
+ }
+
+ public BackupRegistrationProblem getRegistrationProblem()
+ {
+ return problem;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeInt(problem.code);
+ if (problem == BackupRegistrationProblem.EXCEPTION)
+ {
+ buffer.writeInt(errorCode);
+ }
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ if (buffer.readInt() == BackupRegistrationProblem.AUTHENTICATION.code)
+ {
+ problem = BackupRegistrationProblem.AUTHENTICATION;
+ }
+ else
+ {
+ problem = BackupRegistrationProblem.EXCEPTION;
+ errorCode = buffer.readInt();
+ }
+ }
+}
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -1,6 +1,3 @@
-/**
- *
- */
package org.hornetq.core.protocol.core.impl.wireformat;
import org.hornetq.api.core.HornetQBuffer;
@@ -8,23 +5,32 @@
import org.hornetq.core.protocol.core.impl.PacketImpl;
/**
- * Registers a backup node with its live server.
+ * Registers a given backup-server as the replicating backup of a live server (i.e. a
regular
+ * HornetQ).
* <p>
- * After registration the live server will initiate synchronization of its state with the
new backup
- * node.
+ * If it succeeds the backup will start synchronization of its state with the new backup
node, and
+ * replicating any new data. If it fails the backup server will receive a message
indicating
+ * failure, and should shutdown.
+ * @see BackupRegistrationFailedMessage
*/
-public class BackupRegistrationMessage extends PacketImpl
+public final class BackupRegistrationMessage extends PacketImpl
{
private TransportConfiguration connector;
private String nodeID;
- public BackupRegistrationMessage(String nodeId, TransportConfiguration tc)
+ private String clusterUser;
+
+ private String clusterPassword;
+
+ public BackupRegistrationMessage(String nodeId, TransportConfiguration tc, String
user, String password)
{
this();
connector = tc;
nodeID = nodeId;
+ clusterUser = user;
+ clusterPassword = password;
}
public BackupRegistrationMessage()
@@ -46,6 +52,8 @@
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeString(nodeID);
+ buffer.writeString(clusterUser);
+ buffer.writeString(clusterPassword);
connector.encode(buffer);
}
@@ -53,8 +61,26 @@
public void decodeRest(final HornetQBuffer buffer)
{
nodeID = buffer.readString();
+ clusterUser = buffer.readString();
+ clusterPassword = buffer.readString();
connector = new TransportConfiguration();
connector.decode(buffer);
}
+ /**
+ * @return
+ */
+ public String getClusterUser()
+ {
+ return clusterUser;
+ }
+
+ /**
+ * @return
+ */
+ public String getClusterPassword()
+ {
+ return clusterPassword;
+ }
+
}
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -15,7 +15,6 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.impl.PacketImpl;
/**
@@ -25,12 +24,7 @@
*/
public class CreateQueueMessage extends PacketImpl
{
- // Constants -----------------------------------------------------
- private static final Logger log = Logger.getLogger(CreateQueueMessage.class);
-
- // Attributes ----------------------------------------------------
-
private SimpleString address;
private SimpleString queueName;
@@ -43,10 +37,6 @@
private boolean requiresResponse;
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
public CreateQueueMessage(final SimpleString address,
final SimpleString queueName,
final SimpleString filterString,
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -30,9 +30,10 @@
this.nodeID = nodeID;
}
- public ReplicationStartSyncMessage(JournalFile[] datafiles, JournalContent
contentType)
+ public ReplicationStartSyncMessage(JournalFile[] datafiles, JournalContent
contentType, String nodeID)
{
this();
+ this.nodeID = nodeID;
synchronizationIsFinished = false;
ids = new long[datafiles.length];
for (int i = 0; i < datafiles.length; i++)
@@ -46,11 +47,9 @@
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeBoolean(synchronizationIsFinished);
+ buffer.writeString(nodeID);
if (synchronizationIsFinished)
- {
- buffer.writeString(nodeID);
return;
- }
buffer.writeByte(journalType.typeByte);
buffer.writeInt(ids.length);
for (long id : ids)
@@ -63,9 +62,9 @@
public void decodeRest(final HornetQBuffer buffer)
{
synchronizationIsFinished = buffer.readBoolean();
+ nodeID = buffer.readString();
if (synchronizationIsFinished)
{
- nodeID = buffer.readString();
return;
}
journalType = JournalContent.getType(buffer.readByte());
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -12,13 +12,17 @@
*/
package org.hornetq.core.protocol.stomp;
-import java.io.UnsupportedEncodingException;
-
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.Stomp.Headers;
import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.hornetq.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.utils.DataConstants;
/**
*
@@ -113,12 +117,7 @@
public abstract StompFrame onConnect(StompFrame frame);
public abstract StompFrame onDisconnect(StompFrame frame);
- public abstract StompFrame onSend(StompFrame frame);
public abstract StompFrame onAck(StompFrame request);
- public abstract StompFrame onBegin(StompFrame frame);
- public abstract StompFrame onCommit(StompFrame request);
- public abstract StompFrame onAbort(StompFrame request);
- public abstract StompFrame onSubscribe(StompFrame request);
public abstract StompFrame onUnsubscribe(StompFrame request);
public abstract StompFrame onStomp(StompFrame request);
public abstract StompFrame onNack(StompFrame request);
@@ -136,14 +135,219 @@
return receipt;
}
-
- public abstract StompFrame postprocess(StompFrame request);
- public abstract StompFrame createMessageFrame(ServerMessage serverMessage,
- StompSubscription subscription, int deliveryCount) throws Exception;
-
public abstract StompFrame createStompFrame(String command);
public abstract StompFrame decode(StompDecoder decoder, final HornetQBuffer buffer)
throws HornetQStompException;
+
+ public StompFrame onCommit(StompFrame request)
+ {
+ StompFrame response = null;
+
+ String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+ if (txID == null)
+ {
+ response = new HornetQStompException("transaction header is mandatory to
COMMIT a transaction").getFrame();
+ return response;
+ }
+ try
+ {
+ connection.commitTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ return response;
+ }
+
+ public StompFrame onSend(StompFrame frame)
+ {
+ StompFrame response = null;
+ try
+ {
+ connection.validate();
+ String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
+ String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
+
+ long timestamp = System.currentTimeMillis();
+
+ ServerMessageImpl message = connection.createServerMessage();
+ message.setTimestamp(timestamp);
+ message.setAddress(SimpleString.toSimpleString(destination));
+ StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
+ if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
+ {
+ message.setType(Message.BYTES_TYPE);
+ message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
+ }
+ else
+ {
+ message.setType(Message.TEXT_TYPE);
+ String text = frame.getBody();
+
message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
+ }
+
+ connection.sendServerMessage(message, txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ catch (Exception e)
+ {
+ response = new HornetQStompException("Error handling send",
e).getFrame();
+ }
+
+ return response;
+ }
+
+ public StompFrame onBegin(StompFrame frame)
+ {
+ StompFrame response = null;
+ String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
+ if (txID == null)
+ {
+ response = new HornetQStompException("Need a transaction id to
begin").getFrame();
+ }
+ else
+ {
+ try
+ {
+ connection.beginTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ }
+ return response;
+ }
+
+ public StompFrame onAbort(StompFrame request)
+ {
+ StompFrame response = null;
+ String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+
+ if (txID == null)
+ {
+ response = new HornetQStompException("transaction header is mandatory to
ABORT a transaction").getFrame();
+ return response;
+ }
+
+ try
+ {
+ connection.abortTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+
+ return response;
+ }
+
+ public StompFrame onSubscribe(StompFrame request)
+ {
+ StompFrame response = null;
+ String destination = request.getHeader(Stomp.Headers.Subscribe.DESTINATION);
+
+ String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR);
+ String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
+ String id = request.getHeader(Stomp.Headers.Subscribe.ID);
+ String durableSubscriptionName =
request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
+ boolean noLocal = false;
+
+ if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL))
+ {
+ noLocal =
Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
+ }
+
+ try
+ {
+ connection.subscribe(destination, selector, ack, id, durableSubscriptionName,
noLocal);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+
+ return response;
+ }
+
+ public StompFrame postprocess(StompFrame request)
+ {
+ StompFrame response = null;
+ if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED))
+ {
+ response = handleReceipt(request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
+ if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
+ {
+ response.setNeedsDisconnect(true);
+ }
+ }
+ else
+ {
+ //request null, disconnect if so.
+ if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
+ {
+ this.connection.disconnect();
+ }
+ }
+ return response;
+ }
+
+ public StompFrame createMessageFrame(ServerMessage serverMessage,
+ StompSubscription subscription, int deliveryCount) throws Exception
+ {
+ StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE);
+
+ if (subscription.getID() != null)
+ {
+ frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION,
+ subscription.getID());
+ }
+
+ synchronized (serverMessage)
+ {
+
+ HornetQBuffer buffer = serverMessage.getBodyBuffer();
+
+ int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer
+ .writerIndex() : serverMessage.getEndOfBodyPosition();
+ int size = bodyPos - buffer.readerIndex();
+ buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE
+ + DataConstants.SIZE_INT);
+ byte[] data = new byte[size];
+
+ if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH)
+ || serverMessage.getType() == Message.BYTES_TYPE)
+ {
+ frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(data.length));
+ buffer.readBytes(data);
+ }
+ else
+ {
+ SimpleString text = buffer.readNullableSimpleString();
+ if (text != null)
+ {
+ data = text.toString().getBytes("UTF-8");
+ }
+ else
+ {
+ data = new byte[0];
+ }
+ }
+ frame.setByteBody(data);
+
+ serverMessage.getBodyBuffer().resetReaderIndex();
+
+ StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
+ deliveryCount);
+ }
+
+ return frame;
+ }
+
}
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -31,7 +31,6 @@
import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
import org.hornetq.core.protocol.stomp.Stomp.Headers;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.utils.DataConstants;
/**
@@ -101,147 +100,6 @@
}
@Override
- public StompFrame onSend(StompFrame frame)
- {
- StompFrame response = null;
- try
- {
- connection.validate();
- String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
- String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
-
- long timestamp = System.currentTimeMillis();
-
- ServerMessageImpl message = connection.createServerMessage();
- message.setTimestamp(timestamp);
- message.setAddress(SimpleString.toSimpleString(destination));
- StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
- if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
- {
- message.setType(Message.BYTES_TYPE);
- message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
- }
- else
- {
- message.setType(Message.TEXT_TYPE);
- String text = frame.getBody();
-
message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
- }
-
- connection.sendServerMessage(message, txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
- catch (Exception e)
- {
- response = new HornetQStompException("Error handling send",
e).getFrame();
- }
-
- return response;
- }
-
- @Override
- public StompFrame onBegin(StompFrame frame)
- {
- StompFrame response = null;
- String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
- if (txID == null)
- {
- response = new HornetQStompException("Need a transaction id to
begin").getFrame();
- }
- else
- {
- try
- {
- connection.beginTransaction(txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
- }
- return response;
- }
-
- @Override
- public StompFrame onCommit(StompFrame request)
- {
- StompFrame response = null;
-
- String txID = request.getHeader(Stomp.Headers.TRANSACTION);
- if (txID == null)
- {
- response = new HornetQStompException("transaction header is mandatory to
COMMIT a transaction").getFrame();
- return response;
- }
-
- try
- {
- connection.commitTransaction(txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
- return response;
- }
-
- @Override
- public StompFrame onAbort(StompFrame request)
- {
- StompFrame response = null;
- String txID = request.getHeader(Stomp.Headers.TRANSACTION);
-
- if (txID == null)
- {
- response = new HornetQStompException("transaction header is mandatory to
ABORT a transaction").getFrame();
- return response;
- }
-
- try
- {
- connection.abortTransaction(txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
-
- return response;
- }
-
- @Override
- public StompFrame onSubscribe(StompFrame request)
- {
- StompFrame response = null;
- String destination = request.getHeader(Stomp.Headers.Subscribe.DESTINATION);
-
- String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR);
- String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
- String id = request.getHeader(Stomp.Headers.Subscribe.ID);
- String durableSubscriptionName =
request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
- boolean noLocal = false;
-
- if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL))
- {
- noLocal =
Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
- }
-
- try
- {
- connection.subscribe(destination, selector, ack, id, durableSubscriptionName,
noLocal);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
-
- return response;
- }
-
- @Override
public StompFrame onUnsubscribe(StompFrame request)
{
StompFrame response = null;
@@ -313,56 +171,6 @@
}
@Override
- public StompFrame createMessageFrame(ServerMessage serverMessage,
- StompSubscription subscription, int deliveryCount) throws Exception
- {
- StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE);
-
- if (subscription.getID() != null)
- {
- frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
- }
-
- synchronized(serverMessage)
- {
-
- HornetQBuffer buffer = serverMessage.getBodyBuffer();
-
- int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
- :
serverMessage.getEndOfBodyPosition();
- int size = bodyPos - buffer.readerIndex();
- buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
- byte[] data = new byte[size];
-
- if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) ||
serverMessage.getType() == Message.BYTES_TYPE)
- {
- frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(data.length));
- buffer.readBytes(data);
- }
- else
- {
- SimpleString text = buffer.readNullableSimpleString();
- if (text != null)
- {
- data = text.toString().getBytes("UTF-8");
- }
- else
- {
- data = new byte[0];
- }
- }
- frame.setByteBody(data);
-
- serverMessage.getBodyBuffer().resetReaderIndex();
-
- StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
- }
-
- return frame;
-
- }
-
- @Override
public StompFrame createStompFrame(String command)
{
return new StompFrameV10(command);
@@ -388,28 +196,5 @@
// TODO Auto-generated method stub
}
-
- @Override
- public StompFrame postprocess(StompFrame request)
- {
- StompFrame response = null;
- if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED))
- {
- response = handleReceipt(request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
- if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
- {
- response.setNeedsDisconnect(true);
- }
- }
- else
- {
- //request null, disconnect if so.
- if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
- {
- this.connection.disconnect();
- }
- }
- return response;
- }
}
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -17,11 +17,7 @@
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.Message;
-import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.protocol.stomp.FrameEventListener;
import org.hornetq.core.protocol.stomp.HornetQStompException;
import org.hornetq.core.protocol.stomp.SimpleBytes;
@@ -29,12 +25,7 @@
import org.hornetq.core.protocol.stomp.StompConnection;
import org.hornetq.core.protocol.stomp.StompDecoder;
import org.hornetq.core.protocol.stomp.StompFrame;
-import org.hornetq.core.protocol.stomp.StompSubscription;
-import org.hornetq.core.protocol.stomp.StompUtils;
import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.impl.ServerMessageImpl;
-import org.hornetq.utils.DataConstants;
/**
*
@@ -168,172 +159,8 @@
}
return null;
}
-
- @Override
- public StompFrame postprocess(StompFrame request)
- {
- StompFrame response = null;
- if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED))
- {
- response = handleReceipt(request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
- if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
- {
- response.setNeedsDisconnect(true);
- }
- }
- else
- {
- //request null, disconnect if so.
- if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
- {
- this.connection.disconnect();
- }
- }
- return response;
- }
@Override
- public StompFrame onSend(StompFrame frame)
- {
- StompFrame response = null;
- try
- {
- connection.validate();
- String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
- String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
-
- long timestamp = System.currentTimeMillis();
-
- ServerMessageImpl message = connection.createServerMessage();
- message.setTimestamp(timestamp);
- message.setAddress(SimpleString.toSimpleString(destination));
- StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
- if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
- {
- message.setType(Message.BYTES_TYPE);
- message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
- }
- else
- {
- message.setType(Message.TEXT_TYPE);
- String text = frame.getBody();
-
message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
- }
-
- connection.sendServerMessage(message, txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
- catch (Exception e)
- {
- response = new HornetQStompException("Error handling send",
e).getFrame();
- }
-
- return response;
- }
-
- @Override
- public StompFrame onBegin(StompFrame frame)
- {
- StompFrame response = null;
- String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
- if (txID == null)
- {
- response = new HornetQStompException("Need a transaction id to
begin").getFrame();
- }
- else
- {
- try
- {
- connection.beginTransaction(txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
- }
- return response;
- }
-
- @Override
- public StompFrame onCommit(StompFrame request)
- {
- StompFrame response = null;
-
- String txID = request.getHeader(Stomp.Headers.TRANSACTION);
- if (txID == null)
- {
- response = new HornetQStompException("transaction header is mandatory to
COMMIT a transaction").getFrame();
- return response;
- }
-
- try
- {
- connection.commitTransaction(txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
- return response;
- }
-
- @Override
- public StompFrame onAbort(StompFrame request)
- {
- StompFrame response = null;
- String txID = request.getHeader(Stomp.Headers.TRANSACTION);
-
- if (txID == null)
- {
- response = new HornetQStompException("transaction header is mandatory to
ABORT a transaction").getFrame();
- return response;
- }
-
- try
- {
- connection.abortTransaction(txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
-
- return response;
- }
-
- @Override
- public StompFrame onSubscribe(StompFrame request)
- {
- StompFrame response = null;
- String destination = request.getHeader(Stomp.Headers.Subscribe.DESTINATION);
-
- String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR);
- String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
- String id = request.getHeader(Stomp.Headers.Subscribe.ID);
- String durableSubscriptionName =
request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
- boolean noLocal = false;
-
- if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL))
- {
- noLocal =
Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
- }
-
- try
- {
- connection.subscribe(destination, selector, ack, id, durableSubscriptionName,
noLocal);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
-
- return response;
- }
-
- @Override
public StompFrame onUnsubscribe(StompFrame request)
{
StompFrame response = null;
@@ -413,53 +240,6 @@
}
@Override
- public StompFrame createMessageFrame(ServerMessage serverMessage,
- StompSubscription subscription, int deliveryCount)
- throws Exception
- {
- StompFrame frame = new StompFrameV11(Stomp.Responses.MESSAGE);
-
- if (subscription.getID() != null)
- {
- frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
- }
-
- HornetQBuffer buffer = serverMessage.getBodyBuffer();
-
- int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
- :
serverMessage.getEndOfBodyPosition();
- int size = bodyPos - buffer.readerIndex();
- buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
- byte[] data = new byte[size];
- if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) ||
serverMessage.getType() == Message.BYTES_TYPE)
- {
- frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length));
- buffer.readBytes(data);
- }
- else
- {
- SimpleString text = buffer.readNullableSimpleString();
- if (text != null)
- {
- data = text.toString().getBytes("UTF-8");
- }
- else
- {
- data = new byte[0];
- }
- }
-
- frame.setByteBody(data);
-
- serverMessage.getBodyBuffer().resetReaderIndex();
-
- StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
-
- return frame;
-
- }
-
- @Override
public void replySent(StompFrame reply)
{
if (reply.getCommand().equals(Stomp.Responses.CONNECTED))
@@ -544,6 +324,7 @@
lastPingTime.set(System.currentTimeMillis());
}
+ @Override
public void run()
{
lastAccepted.set(System.currentTimeMillis());
@@ -647,6 +428,7 @@
}
//all frame except CONNECT are decoded here.
+ @Override
public StompFrame decode(StompDecoder decoder, final HornetQBuffer buffer) throws
HornetQStompException
{
int readable = buffer.readableBytes();
Deleted:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -1,49 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.replication;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.core.journal.Journal;
-import org.hornetq.core.journal.JournalLoadInformation;
-import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.ChannelHandler;
-import org.hornetq.core.server.HornetQComponent;
-import org.hornetq.core.server.impl.QuorumManager;
-
-/**
- * A ReplicationEndpoint
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public interface ReplicationEndpoint extends ChannelHandler, HornetQComponent
-{
-
- void setChannel(Channel channel);
-
- Channel getChannel();
-
- void compareJournalInformation(JournalLoadInformation[] journalInformation) throws
HornetQException;
-
- void registerJournal(final byte id, final Journal journal);
-
- /**
- * Sets the quorumManager used by the server in the replicationEndpoint. It is used to
inform the
- * backup server of the live's nodeID.
- * @param quorumManager
- */
- void setQuorumManager(QuorumManager quorumManager);
-
-}
Copied:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
(from rev 11800,
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java)
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
(rev 0)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -0,0 +1,923 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.journal.IOCriticalErrorListener;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.impl.FileWrapperJournal;
+import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.impl.PagingManagerImpl;
+import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.ChannelHandler;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
+import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
+import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
+import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.QuorumManager;
+
+/**
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ */
+public class ReplicationEndpoint implements ChannelHandler, HornetQComponent
+{
+
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(ReplicationEndpoint.class);
+
+ // Attributes ----------------------------------------------------
+
+ private static final boolean trace = log.isTraceEnabled();
+
+ private final IOCriticalErrorListener criticalErrorListener;
+
+ private final HornetQServerImpl server;
+
+ private Channel channel;
+
+ private Journal[] journals;
+ private final JournalLoadInformation[] journalLoadInformation = new
JournalLoadInformation[2];
+
+ /** Files reserved in each journal for synchronization of existing data from the
'live' server. */
+ private final Map<JournalContent, Map<Long, JournalSyncFile>>
filesReservedForSync =
+ new HashMap<JournalContent, Map<Long, JournalSyncFile>>();
+ private final Map<Long, LargeServerMessage> largeMessagesOnSync = new
HashMap<Long, LargeServerMessage>();
+
+ /**
+ * Used to hold the real Journals before the backup is synchronized. This field should
be
+ * {@code null} on an up-to-date server.
+ */
+ private Map<JournalContent, Journal> journalsHolder = new
HashMap<JournalContent, Journal>();
+
+ private StorageManager storage;
+
+ private PagingManager pageManager;
+
+ private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>>
pageIndex =
+ new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer,
Page>>();
+ private final ConcurrentMap<Long, LargeServerMessage> largeMessages =
+ new ConcurrentHashMap<Long, LargeServerMessage>();
+
+ // Used on tests, to simulate failures on delete pages
+ private boolean deletePages = true;
+ private boolean started;
+
+ private QuorumManager quorumManager;
+
+ // Constructors --------------------------------------------------
+ public ReplicationEndpoint(final HornetQServerImpl server, IOCriticalErrorListener
criticalErrorListener)
+ {
+ this.server = server;
+ this.criticalErrorListener = criticalErrorListener;
+ }
+
+ // Public --------------------------------------------------------
+
+ public void registerJournal(final byte id, final Journal journal)
+ {
+ if (journals == null || id >= journals.length)
+ {
+ Journal[] oldJournals = journals;
+ journals = new Journal[id + 1];
+
+ if (oldJournals != null)
+ {
+ for (int i = 0 ; i < oldJournals.length; i++)
+ {
+ journals[i] = oldJournals[i];
+ }
+ }
+ }
+
+ journals[id] = journal;
+ }
+
+ @Override
+ public void handlePacket(final Packet packet)
+ {
+ PacketImpl response = new ReplicationResponseMessage();
+ final byte type=packet.getType();
+
+ try
+ {
+ if (type == PacketImpl.REPLICATION_APPEND)
+ {
+ handleAppendAddRecord((ReplicationAddMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_APPEND_TX)
+ {
+ handleAppendAddTXRecord((ReplicationAddTXMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_DELETE)
+ {
+ handleAppendDelete((ReplicationDeleteMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_DELETE_TX)
+ {
+ handleAppendDeleteTX((ReplicationDeleteTXMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_PREPARE)
+ {
+ handlePrepare((ReplicationPrepareMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_COMMIT_ROLLBACK)
+ {
+ handleCommitRollback((ReplicationCommitMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_PAGE_WRITE)
+ {
+ handlePageWrite((ReplicationPageWriteMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_PAGE_EVENT)
+ {
+ handlePageEvent((ReplicationPageEventMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN)
+ {
+ handleLargeMessageBegin((ReplicationLargeMessageBeingMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE)
+ {
+ handleLargeMessageWrite((ReplicationLargeMessageWriteMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_END)
+ {
+ handleLargeMessageEnd((ReplicationLargeMessageEndMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_COMPARE_DATA)
+ {
+ handleCompareDataMessage((ReplicationCompareDataMessage)packet);
+ response = new NullResponseMessage();
+ }
+ else if (type == PacketImpl.REPLICATION_START_FINISH_SYNC)
+ {
+ handleStartReplicationSynchronization((ReplicationStartSyncMessage)packet);
+ }
+ else if (type == PacketImpl.REPLICATION_SYNC_FILE)
+ {
+ handleReplicationSynchronization((ReplicationSyncFileMessage)packet);
+ }
+ else
+ {
+ log.warn("Packet " + packet + " can't be processed by the
ReplicationEndpoint");
+ }
+ }
+ catch (HornetQException e)
+ {
+ log.warn(e.getMessage(), e);
+ response = new HornetQExceptionMessage(e);
+ }
+ catch (Exception e)
+ {
+ ReplicationEndpoint.log.warn(e.getMessage(), e);
+ response = new HornetQExceptionMessage((HornetQException)e);
+ }
+
+ channel.send(response);
+ }
+
+ public boolean isStarted()
+ {
+ return started;
+ }
+
+ public synchronized void start() throws Exception
+ {
+ Configuration config = server.getConfiguration();
+ try
+ {
+ storage = server.getStorageManager();
+ storage.start();
+
+ server.getManagementService().setStorageManager(storage);
+
+ journalsHolder.put(JournalContent.BINDINGS, storage.getBindingsJournal());
+ journalsHolder.put(JournalContent.MESSAGES, storage.getMessageJournal());
+
+ for (JournalContent jc : EnumSet.allOf(JournalContent.class))
+ {
+ filesReservedForSync.put(jc, new HashMap<Long, JournalSyncFile>());
+ // We only need to load internal structures on the backup...
+ journalLoadInformation[jc.typeByte] = journalsHolder.get(jc).loadSyncOnly();
+ }
+
+ pageManager = new PagingManagerImpl(new
PagingStoreFactoryNIO(config.getPagingDirectory(),
+
config.getJournalBufferSize_NIO(),
+
server.getScheduledPool(),
+
server.getExecutorFactory(),
+
config.isJournalSyncNonTransactional(), criticalErrorListener),
+ storage,
+ server.getAddressSettingsRepository());
+
+ pageManager.start();
+
+ started = true;
+ }
+ catch (Exception e)
+ {
+ if (!server.isStopped())
+ throw e;
+ }
+ }
+
+ public synchronized void stop() throws Exception
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ // This could be null if the backup server is being
+ // shut down without any live server connecting here
+ if (channel != null)
+ {
+ channel.close();
+ }
+
+ for (ConcurrentMap<Integer, Page> map : pageIndex.values())
+ {
+ for (Page page : map.values())
+ {
+ try
+ {
+ page.close();
+ }
+ catch (Exception e)
+ {
+ log.warn("Error while closing the page on backup", e);
+ }
+ }
+ }
+
+ pageIndex.clear();
+
+ for (LargeServerMessage largeMessage : largeMessages.values())
+ {
+ largeMessage.releaseResources();
+ }
+ largeMessages.clear();
+
+ for (LargeServerMessage largeMessage : largeMessagesOnSync.values())
+ {
+ largeMessage.releaseResources();
+ }
+ largeMessagesOnSync.clear();
+
+ for (Entry<JournalContent, Map<Long, JournalSyncFile>> entry :
filesReservedForSync.entrySet())
+ {
+ for (JournalSyncFile filesReserved : entry.getValue().values())
+ {
+ filesReserved.close();
+ }
+ }
+
+ filesReservedForSync.clear();
+ if (journals != null)
+ {
+ for (Journal j : journals)
+ {
+ if (j instanceof FileWrapperJournal)
+ j.stop();
+ }
+ }
+
+ pageManager.stop();
+
+ // Storage needs to be the last to stop
+ storage.stop();
+
+ started = false;
+ }
+
+
+ public Channel getChannel()
+ {
+ return channel;
+ }
+
+ public void setChannel(final Channel channel)
+ {
+ this.channel = channel;
+ }
+
+ public void compareJournalInformation(final JournalLoadInformation[]
journalInformation) throws HornetQException
+ {
+ if (!server.isRemoteBackupUpToDate())
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Cannot compare
journals if not in sync!");
+ }
+
+ if (journalLoadInformation == null || journalLoadInformation.length !=
journalInformation.length)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR,
+ "Live Node contains more journals than the
backup node. Probably a version match error");
+ }
+
+ for (int i = 0; i < journalInformation.length; i++)
+ {
+ if (!journalInformation[i].equals(journalLoadInformation[i]))
+ {
+ log.warn("Journal comparison mismatch:\n" +
journalParametersToString(journalInformation));
+ throw new HornetQException(HornetQException.ILLEGAL_STATE,
+ "Backup node can't connect to the live
node as the data differs");
+ }
+ }
+
+ }
+
+ /** Used on tests only. To simulate missing page deletes*/
+ public void setDeletePages(final boolean deletePages)
+ {
+ this.deletePages = deletePages;
+ }
+
+ /**
+ * @param journalInformation
+ */
+ private String journalParametersToString(final JournalLoadInformation[]
journalInformation)
+ {
+ return "**********************************************************\n" +
"parameters:\n" +
+ "Bindings = " +
+ journalInformation[0] +
+ "\n" +
+ "Messaging = " +
+ journalInformation[1] +
+ "\n" +
+ "**********************************************************" +
+ "\n" +
+ "Expected:" +
+ "\n" +
+ "Bindings = " +
+ journalLoadInformation[0] +
+ "\n" +
+ "Messaging = " +
+ journalLoadInformation[1] +
+ "\n" +
+ "**********************************************************";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private void finishSynchronization(String liveID) throws Exception
+ {
+ for (JournalContent jc : EnumSet.allOf(JournalContent.class))
+ {
+ JournalImpl journal = (JournalImpl)journalsHolder.remove(jc);
+ journal.synchronizationLock();
+ try
+ {
+ if (journal.getDataFiles().length != 0)
+ {
+ throw new IllegalStateException("Journal should not have any data
files at this point");
+ }
+ // files should be already in place.
+ filesReservedForSync.remove(jc);
+ registerJournal(jc.typeByte, journal);
+ journal.stop();
+ journal.start();
+ journal.loadInternalOnly();
+ }
+ finally
+ {
+ journal.synchronizationUnlock();
+ }
+ }
+ synchronized (largeMessagesOnSync)
+ {
+ synchronized (largeMessages)
+ {
+ ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
+ for (Entry<Long, LargeServerMessage> entry : largeMessages.entrySet())
+ {
+ Long id = entry.getKey();
+ LargeServerMessage lm = entry.getValue();
+ if (largeMessagesOnSync.containsKey(id))
+ {
+ SequentialFile sq = lm.getFile();
+ LargeServerMessage mainLM = largeMessagesOnSync.get(id);
+ SequentialFile mainSeqFile = mainLM.getFile();
+ for (;;)
+ {
+ buffer.rewind();
+ int size = sq.read(buffer);
+ mainSeqFile.writeInternal(buffer);
+ if (size < buffer.capacity())
+ {
+ break;
+ }
+ }
+ }
+ else
+ {
+ // these are large-messages created after sync started
+ largeMessagesOnSync.put(id, lm);
+ }
+ }
+ largeMessages.clear();
+ largeMessages.putAll(largeMessagesOnSync);
+ largeMessagesOnSync.clear();
+ }
+ }
+ journalsHolder = null;
+ quorumManager.setLiveID(liveID);
+ server.setRemoteBackupUpToDate(liveID);
+ log.info("Backup server " + server + " is synchronized with
live-server.");
+ return;
+ }
+
+ private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws
Exception
+ {
+ Long id = Long.valueOf(msg.getId());
+ byte[] data = msg.getData();
+ SequentialFile channel;
+ switch (msg.getFileType())
+ {
+ case LARGE_MESSAGE:
+ {
+ synchronized (largeMessagesOnSync)
+ {
+ LargeServerMessage largeMessage = largeMessagesOnSync.get(id);
+ if (largeMessage == null)
+ {
+ largeMessage = storage.createLargeMessage();
+ largeMessage.setDurable(true);
+ largeMessage.setMessageID(id);
+ largeMessagesOnSync.put(id, largeMessage);
+ }
+ channel = largeMessage.getFile();
+ }
+ break;
+ }
+ case PAGE:
+ {
+ Page page = getPage(msg.getPageStore(), (int)msg.getId());
+
+ channel = page.getFile();
+ break;
+ }
+ case JOURNAL:
+ {
+ JournalSyncFile journalSyncFile =
filesReservedForSync.get(msg.getJournalContent()).get(id);
+ FileChannel channel2 = journalSyncFile.getChannel();
+ if (data == null)
+ {
+ channel2.close();
+ return;
+ }
+ channel2.write(ByteBuffer.wrap(data));
+ return;
+ }
+ default:
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Unhandled
file type " + msg.getFileType());
+ }
+
+ if (data == null)
+ {
+ channel.close();
+ return;
+ }
+
+ if (!channel.isOpen())
+ {
+ channel.open(1, false);
+ }
+ channel.writeDirect(ByteBuffer.wrap(data), true);
+ }
+
+ /**
+ * Reserves files (with the given fileID) in the specified journal, and places a
+ * {@link FileWrapperJournal} in place to store messages while synchronization is
going on.
+ * @param packet
+ * @throws Exception
+ */
+ private void handleStartReplicationSynchronization(final ReplicationStartSyncMessage
packet) throws Exception
+ {
+ if (server.isRemoteBackupUpToDate())
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "RemoteBackup
can not be up-to-date!");
+ }
+
+ if (packet.isSynchronizationFinished())
+ {
+ finishSynchronization(packet.getNodeID());
+ return;
+ }
+
+ final Journal journal = journalsHolder.get(packet.getJournalContentType());
+ synchronized (this)
+ {
+ if (!started)
+ return;
+ if (packet.getNodeID() != null)
+ {
+ quorumManager.setLiveID(packet.getNodeID());
+ }
+ Map<Long, JournalSyncFile> mapToFill =
filesReservedForSync.get(packet.getJournalContentType());
+ log.info("Journal " + packet.getJournalContentType() + ".
Reserving fileIDs for synchronization: " +
+ Arrays.toString(packet.getFileIds()));
+
+ for (Entry<Long, JournalFile> entry :
journal.createFilesForBackupSync(packet.getFileIds()).entrySet())
+ {
+ mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
+ }
+ FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
+ registerJournal(packet.getJournalContentType().typeByte, syncJournal);
+ }
+ }
+
+ private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
+ {
+ LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), true);
+
+ if (message != null)
+ {
+ try
+ {
+ message.deleteFile();
+ }
+ catch (Exception e)
+ {
+ log.warn("Error deleting large message ID = " +
packet.getMessageId(), e);
+ }
+ }
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleLargeMessageWrite(final ReplicationLargeMessageWriteMessage packet)
throws Exception
+ {
+ LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), false);
+ if (message != null)
+ {
+ message.addBytes(packet.getBody());
+ }
+ }
+
+ /**
+ * @param request
+ */
+ private void handleCompareDataMessage(final ReplicationCompareDataMessage request)
throws HornetQException
+ {
+ compareJournalInformation(request.getJournalInformation());
+ }
+
+ private LargeServerMessage lookupLargeMessage(final long messageId, final boolean
delete)
+ {
+ LargeServerMessage message;
+
+ if (delete)
+ {
+ message = largeMessages.remove(messageId);
+ }
+ else
+ {
+ message = largeMessages.get(messageId);
+ if (message == null)
+ {
+ synchronized (largeMessages)
+ {
+ if (!server.isRemoteBackupUpToDate())
+ {
+ // in case we need to append data to a file while still sync'ing
the backup
+ createLargeMessage(messageId, true);
+ message = largeMessages.get(messageId);
+ }
+ }
+ }
+ }
+
+ if (message == null)
+ {
+ log.warn("Large MessageID " + messageId +
+ " is not available on backup server.
Ignoring replication message");
+ }
+
+ return message;
+
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleLargeMessageBegin(final ReplicationLargeMessageBeingMessage
packet)
+ {
+ final long id = packet.getMessageId();
+ createLargeMessage(id, false);
+ log.trace("Receiving Large Message " + id + " on backup");
+ }
+
+ private void createLargeMessage(final long id, boolean sync)
+ {
+ LargeServerMessage msg = storage.createLargeMessage();
+ msg.setDurable(true);
+ msg.setMessageID(id);
+ msg.setReplicationSync(sync);
+ largeMessages.put(id, msg);
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleCommitRollback(final ReplicationCommitMessage packet) throws
Exception
+ {
+ Journal journalToUse = getJournal(packet.getJournalID());
+
+ if (packet.isRollback())
+ {
+ journalToUse.appendRollbackRecord(packet.getTxId(), packet.getSync());
+ }
+ else
+ {
+ journalToUse.appendCommitRecord(packet.getTxId(), packet.getSync());
+ }
+ }
+
+ /**
+ * @param packet
+ */
+ private void handlePrepare(final ReplicationPrepareMessage packet) throws Exception
+ {
+ Journal journalToUse = getJournal(packet.getJournalID());
+
+ journalToUse.appendPrepareRecord(packet.getTxId(), packet.getRecordData(), false);
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleAppendDeleteTX(final ReplicationDeleteTXMessage packet) throws
Exception
+ {
+ Journal journalToUse = getJournal(packet.getJournalID());
+
+ journalToUse.appendDeleteRecordTransactional(packet.getTxId(), packet.getId(),
packet.getRecordData());
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleAppendDelete(final ReplicationDeleteMessage packet) throws
Exception
+ {
+ Journal journalToUse = getJournal(packet.getJournalID());
+
+ journalToUse.appendDeleteRecord(packet.getId(), false);
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleAppendAddTXRecord(final ReplicationAddTXMessage packet) throws
Exception
+ {
+ Journal journalToUse = getJournal(packet.getJournalID());
+
+ if (packet.isUpdate())
+ {
+ journalToUse.appendUpdateRecordTransactional(packet.getTxId(),
+ packet.getId(),
+ packet.getRecordType(),
+ packet.getRecordData());
+ }
+ else
+ {
+ journalToUse.appendAddRecordTransactional(packet.getTxId(),
+ packet.getId(),
+ packet.getRecordType(),
+ packet.getRecordData());
+ }
+ }
+
+ /**
+ * @param packet
+ * @throws Exception
+ */
+ private void handleAppendAddRecord(final ReplicationAddMessage packet) throws
Exception
+ {
+ Journal journalToUse = getJournal(packet.getJournalID());
+
+ if (packet.isUpdate())
+ {
+ if (ReplicationEndpoint.trace)
+ {
+ log.trace("Endpoint appendUpdate id = " + packet.getId());
+ }
+ journalToUse.appendUpdateRecord(packet.getId(), packet.getRecordType(),
packet.getRecordData(), false);
+ }
+ else
+ {
+ if (ReplicationEndpoint.trace)
+ {
+ log.trace("Endpoint append id = " + packet.getId());
+ }
+ journalToUse.appendAddRecord(packet.getId(), packet.getRecordType(),
packet.getRecordData(), false);
+ }
+ }
+
+ /**
+ * @param packet
+ */
+ private void handlePageEvent(final ReplicationPageEventMessage packet) throws
Exception
+ {
+ ConcurrentMap<Integer, Page> pages = getPageMap(packet.getStoreName());
+
+ Page page = pages.remove(packet.getPageNumber());
+
+ if (page == null)
+ {
+ page = getPage(packet.getStoreName(), packet.getPageNumber());
+ }
+
+ if (page != null)
+ {
+ if (packet.isDelete())
+ {
+ if (deletePages)
+ {
+ page.delete(null);
+ }
+ }
+ else
+ {
+ page.close();
+ }
+ }
+
+ }
+
+ /**
+ * @param packet
+ */
+ private void handlePageWrite(final ReplicationPageWriteMessage packet) throws
Exception
+ {
+ PagedMessage pgdMessage = packet.getPagedMessage();
+ pgdMessage.initMessage(storage);
+ ServerMessage msg = pgdMessage.getMessage();
+ Page page = getPage(msg.getAddress(), packet.getPageNumber());
+ page.write(pgdMessage);
+ }
+
+ private ConcurrentMap<Integer, Page> getPageMap(final SimpleString storeName)
+ {
+ ConcurrentMap<Integer, Page> resultIndex = pageIndex.get(storeName);
+
+ if (resultIndex == null)
+ {
+ resultIndex = new ConcurrentHashMap<Integer, Page>();
+ ConcurrentMap<Integer, Page> mapResult = pageIndex.putIfAbsent(storeName,
resultIndex);
+ if (mapResult != null)
+ {
+ resultIndex = mapResult;
+ }
+ }
+
+ return resultIndex;
+ }
+
+ private Page getPage(final SimpleString storeName, final int pageId) throws Exception
+ {
+ ConcurrentMap<Integer, Page> map = getPageMap(storeName);
+
+ Page page = map.get(pageId);
+
+ if (page == null)
+ {
+ page = newPage(pageId, storeName, map);
+ }
+
+ return page;
+ }
+
+ /**
+ * @param pageId
+ * @param map
+ * @return
+ */
+ private synchronized Page newPage(final int pageId,
+ final SimpleString storeName,
+ final ConcurrentMap<Integer, Page> map) throws
Exception
+ {
+ Page page = map.get(pageId);
+
+ if (page == null)
+ {
+ page = pageManager.getPageStore(storeName).createPage(pageId);
+ page.open();
+ map.put(pageId, page);
+ }
+
+ return page;
+ }
+
+ /**
+ * @param journalID
+ * @return
+ */
+ private Journal getJournal(final byte journalID)
+ {
+ return journals[journalID];
+ }
+
+ public static class JournalSyncFile
+ {
+
+ private FileChannel channel;
+ private final File file;
+
+ public JournalSyncFile(JournalFile jFile) throws Exception
+ {
+ SequentialFile seqFile = jFile.getFile();
+ file = seqFile.getJavaFile();
+ seqFile.close();
+ }
+
+ FileChannel getChannel() throws Exception
+ {
+ if (channel == null)
+ {
+ channel = new FileOutputStream(file).getChannel();
+ }
+ return channel;
+ }
+
+ void close() throws IOException
+ {
+ if (channel != null)
+ channel.close();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "JournalSyncFile(file=" + file.getAbsolutePath() +
")";
+ }
+ }
+
+ /**
+ * Sets the quorumManager used by the server in the replicationEndpoint. It is used to
inform the
+ * backup server of the live's nodeID.
+ * @param quorumManager
+ */
+ public void setQuorumManager(QuorumManager quorumManager)
+ {
+ this.quorumManager = quorumManager;
+ }
+}
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -93,7 +93,8 @@
* @param contentType
* @throws HornetQException
*/
- void sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType) throws
HornetQException;
+ void sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType, String
nodeID)
+
throws HornetQException;
/**
* Informs backup that data synchronization is done.
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -27,7 +27,6 @@
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.replication.ReplicationManager;
@@ -42,10 +41,6 @@
public class ReplicatedJournal implements Journal
{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(ReplicatedJournal.class);
-
// Attributes ----------------------------------------------------
private static final boolean trace = false;
Deleted:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -1,916 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.replication.impl;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.Arrays;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.journal.IOCriticalErrorListener;
-import org.hornetq.core.journal.Journal;
-import org.hornetq.core.journal.JournalLoadInformation;
-import org.hornetq.core.journal.SequentialFile;
-import org.hornetq.core.journal.impl.FileWrapperJournal;
-import org.hornetq.core.journal.impl.JournalFile;
-import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.Page;
-import org.hornetq.core.paging.PagedMessage;
-import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.impl.PagingManagerImpl;
-import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
-import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
-import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
-import org.hornetq.core.replication.ReplicationEndpoint;
-import org.hornetq.core.server.LargeServerMessage;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.impl.HornetQServerImpl;
-import org.hornetq.core.server.impl.QuorumManager;
-
-/**
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class ReplicationEndpointImpl implements ReplicationEndpoint
-{
-
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(ReplicationEndpointImpl.class);
-
- // Attributes ----------------------------------------------------
-
- private static final boolean trace = log.isTraceEnabled();
-
- private final IOCriticalErrorListener criticalErrorListener;
-
- private final HornetQServerImpl server;
-
- private Channel channel;
-
- private Journal[] journals;
- private final JournalLoadInformation[] journalLoadInformation = new
JournalLoadInformation[2];
-
- /** Files reserved in each journal for synchronization of existing data from the
'live' server. */
- private final Map<JournalContent, Map<Long, JournalSyncFile>>
filesReservedForSync =
- new HashMap<JournalContent, Map<Long, JournalSyncFile>>();
- private final Map<Long, LargeServerMessage> largeMessagesOnSync = new
HashMap<Long, LargeServerMessage>();
-
- /**
- * Used to hold the real Journals before the backup is synchronized. This field should
be
- * {@code null} on an up-to-date server.
- */
- private Map<JournalContent, Journal> journalsHolder = new
HashMap<JournalContent, Journal>();
-
- private StorageManager storage;
-
- private PagingManager pageManager;
-
- private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>>
pageIndex =
- new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer,
Page>>();
- private final ConcurrentMap<Long, LargeServerMessage> largeMessages =
- new ConcurrentHashMap<Long, LargeServerMessage>();
-
- // Used on tests, to simulate failures on delete pages
- private boolean deletePages = true;
- private boolean started;
-
- private QuorumManager quorumManager;
-
- // Constructors --------------------------------------------------
- public ReplicationEndpointImpl(final HornetQServerImpl server, IOCriticalErrorListener
criticalErrorListener)
- {
- this.server = server;
- this.criticalErrorListener = criticalErrorListener;
- }
-
- // Public --------------------------------------------------------
-
- public void registerJournal(final byte id, final Journal journal)
- {
- if (journals == null || id >= journals.length)
- {
- Journal[] oldJournals = journals;
- journals = new Journal[id + 1];
-
- if (oldJournals != null)
- {
- for (int i = 0 ; i < oldJournals.length; i++)
- {
- journals[i] = oldJournals[i];
- }
- }
- }
-
- journals[id] = journal;
- }
-
- public void handlePacket(final Packet packet)
- {
- PacketImpl response = new ReplicationResponseMessage();
- final byte type=packet.getType();
-
- try
- {
- if (type == PacketImpl.REPLICATION_APPEND)
- {
- handleAppendAddRecord((ReplicationAddMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_APPEND_TX)
- {
- handleAppendAddTXRecord((ReplicationAddTXMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_DELETE)
- {
- handleAppendDelete((ReplicationDeleteMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_DELETE_TX)
- {
- handleAppendDeleteTX((ReplicationDeleteTXMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_PREPARE)
- {
- handlePrepare((ReplicationPrepareMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_COMMIT_ROLLBACK)
- {
- handleCommitRollback((ReplicationCommitMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_PAGE_WRITE)
- {
- handlePageWrite((ReplicationPageWriteMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_PAGE_EVENT)
- {
- handlePageEvent((ReplicationPageEventMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN)
- {
- handleLargeMessageBegin((ReplicationLargeMessageBeingMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE)
- {
- handleLargeMessageWrite((ReplicationLargeMessageWriteMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_END)
- {
- handleLargeMessageEnd((ReplicationLargeMessageEndMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_COMPARE_DATA)
- {
- handleCompareDataMessage((ReplicationCompareDataMessage)packet);
- response = new NullResponseMessage();
- }
- else if (type == PacketImpl.REPLICATION_START_FINISH_SYNC)
- {
- handleStartReplicationSynchronization((ReplicationStartSyncMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_SYNC_FILE)
- {
- handleReplicationSynchronization((ReplicationSyncFileMessage)packet);
- }
- else
- {
- log.warn("Packet " + packet + " can't be processed by the
ReplicationEndpoint");
- }
- }
- catch (HornetQException e)
- {
- log.warn(e.getMessage(), e);
- response = new HornetQExceptionMessage(e);
- }
- catch (Exception e)
- {
- ReplicationEndpointImpl.log.warn(e.getMessage(), e);
- response = new HornetQExceptionMessage((HornetQException)e);
- }
-
- channel.send(response);
- }
-
- public boolean isStarted()
- {
- return started;
- }
-
- public synchronized void start() throws Exception
- {
- Configuration config = server.getConfiguration();
-
- storage = server.getStorageManager();
- storage.start();
-
- server.getManagementService().setStorageManager(storage);
-
- journalsHolder.put(JournalContent.BINDINGS, storage.getBindingsJournal());
- journalsHolder.put(JournalContent.MESSAGES, storage.getMessageJournal());
-
- for (JournalContent jc : EnumSet.allOf(JournalContent.class))
- {
- filesReservedForSync.put(jc, new HashMap<Long, JournalSyncFile>());
- // We only need to load internal structures on the backup...
- journalLoadInformation[jc.typeByte] = journalsHolder.get(jc).loadSyncOnly();
- }
-
- pageManager = new PagingManagerImpl(new
PagingStoreFactoryNIO(config.getPagingDirectory(),
-
config.getJournalBufferSize_NIO(),
-
server.getScheduledPool(),
-
server.getExecutorFactory(),
-
config.isJournalSyncNonTransactional(), criticalErrorListener),
- storage,
- server.getAddressSettingsRepository());
-
- pageManager.start();
-
- started = true;
-
- }
-
- public synchronized void stop() throws Exception
- {
- if (!started)
- {
- return;
- }
-
- // This could be null if the backup server is being
- // shut down without any live server connecting here
- if (channel != null)
- {
- channel.close();
- }
- storage.stop();
-
- for (ConcurrentMap<Integer, Page> map : pageIndex.values())
- {
- for (Page page : map.values())
- {
- try
- {
- page.close();
- }
- catch (Exception e)
- {
- log.warn("Error while closing the page on backup", e);
- }
- }
- }
-
- pageIndex.clear();
-
- for (LargeServerMessage largeMessage : largeMessages.values())
- {
- largeMessage.releaseResources();
- }
- largeMessages.clear();
-
- for (LargeServerMessage largeMessage : largeMessagesOnSync.values())
- {
- largeMessage.releaseResources();
- }
- largeMessagesOnSync.clear();
-
- for (Entry<JournalContent, Map<Long, JournalSyncFile>> entry :
filesReservedForSync.entrySet())
- {
- for (JournalSyncFile filesReserved : entry.getValue().values())
- {
- filesReserved.close();
- }
- }
-
- filesReservedForSync.clear();
- if (journals != null)
- {
- for (Journal j : journals)
- {
- if (j instanceof FileWrapperJournal)
- j.stop();
- }
- }
-
- pageManager.stop();
-
- // Storage needs to be the last to stop
- storage.stop();
-
- started = false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationEndpoint#getChannel()
- */
- public Channel getChannel()
- {
- return channel;
- }
-
- /* (non-Javadoc)
- * @see
org.hornetq.core.replication.ReplicationEndpoint#setChannel(org.hornetq.core.remoting.Channel)
- */
- public void setChannel(final Channel channel)
- {
- this.channel = channel;
- }
-
- public void compareJournalInformation(final JournalLoadInformation[]
journalInformation) throws HornetQException
- {
- if (!server.isRemoteBackupUpToDate())
- {
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Cannot compare
journals if not in sync!");
- }
-
- if (journalLoadInformation == null || journalLoadInformation.length !=
journalInformation.length)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR,
- "Live Node contains more journals than the
backup node. Probably a version match error");
- }
-
- for (int i = 0; i < journalInformation.length; i++)
- {
- if (!journalInformation[i].equals(journalLoadInformation[i]))
- {
- log.warn("Journal comparison mismatch:\n" +
journalParametersToString(journalInformation));
- throw new HornetQException(HornetQException.ILLEGAL_STATE,
- "Backup node can't connect to the live
node as the data differs");
- }
- }
-
- }
-
- /** Used on tests only. To simulate missing page deletes*/
- public void setDeletePages(final boolean deletePages)
- {
- this.deletePages = deletePages;
- }
-
- /**
- * @param journalInformation
- */
- private String journalParametersToString(final JournalLoadInformation[]
journalInformation)
- {
- return "**********************************************************\n" +
"parameters:\n" +
- "Bindings = " +
- journalInformation[0] +
- "\n" +
- "Messaging = " +
- journalInformation[1] +
- "\n" +
- "**********************************************************" +
- "\n" +
- "Expected:" +
- "\n" +
- "Bindings = " +
- journalLoadInformation[0] +
- "\n" +
- "Messaging = " +
- journalLoadInformation[1] +
- "\n" +
- "**********************************************************";
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private void finishSynchronization(String liveID) throws Exception
- {
- for (JournalContent jc : EnumSet.allOf(JournalContent.class))
- {
- JournalImpl journal = (JournalImpl)journalsHolder.remove(jc);
- journal.synchronizationLock();
- try
- {
- if (journal.getDataFiles().length != 0)
- {
- throw new IllegalStateException("Journal should not have any data
files at this point");
- }
- // files should be already in place.
- filesReservedForSync.remove(jc);
- registerJournal(jc.typeByte, journal);
- journal.stop();
- journal.start();
- journal.loadInternalOnly();
- }
- finally
- {
- journal.synchronizationUnlock();
- }
- }
- synchronized (largeMessagesOnSync)
- {
- synchronized (largeMessages)
- {
- ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
- for (Entry<Long, LargeServerMessage> entry : largeMessages.entrySet())
- {
- Long id = entry.getKey();
- LargeServerMessage lm = entry.getValue();
- if (largeMessagesOnSync.containsKey(id))
- {
- SequentialFile sq = lm.getFile();
- LargeServerMessage mainLM = largeMessagesOnSync.get(id);
- SequentialFile mainSeqFile = mainLM.getFile();
- for (;;)
- {
- buffer.rewind();
- int size = sq.read(buffer);
- mainSeqFile.writeInternal(buffer);
- if (size < buffer.capacity())
- {
- break;
- }
- }
- }
- else
- {
- // these are large-messages created after sync started
- largeMessagesOnSync.put(id, lm);
- }
- }
- largeMessages.clear();
- largeMessages.putAll(largeMessagesOnSync);
- largeMessagesOnSync.clear();
- }
- }
- journalsHolder = null;
- quorumManager.setLiveID(liveID);
- server.setRemoteBackupUpToDate(liveID);
- log.info("Backup server " + server + " is synchronized with
live-server.");
- return;
- }
-
- private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws
Exception
- {
- Long id = Long.valueOf(msg.getId());
- byte[] data = msg.getData();
- SequentialFile channel;
- switch (msg.getFileType())
- {
- case LARGE_MESSAGE:
- {
- synchronized (largeMessagesOnSync)
- {
- LargeServerMessage largeMessage = largeMessagesOnSync.get(id);
- if (largeMessage == null)
- {
- largeMessage = storage.createLargeMessage();
- largeMessage.setDurable(true);
- largeMessage.setMessageID(id);
- largeMessagesOnSync.put(id, largeMessage);
- }
- channel = largeMessage.getFile();
- }
- break;
- }
- case PAGE:
- {
- Page page = getPage(msg.getPageStore(), (int)msg.getId());
-
- channel = page.getFile();
- break;
- }
- case JOURNAL:
- {
- JournalSyncFile journalSyncFile =
filesReservedForSync.get(msg.getJournalContent()).get(id);
- FileChannel channel2 = journalSyncFile.getChannel();
- if (data == null)
- {
- channel2.close();
- return;
- }
- channel2.write(ByteBuffer.wrap(data));
- return;
- }
- default:
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Unhandled
file type " + msg.getFileType());
- }
-
- if (data == null)
- {
- channel.close();
- return;
- }
-
- if (!channel.isOpen())
- {
- channel.open(1, false);
- }
- channel.writeDirect(ByteBuffer.wrap(data), true);
- }
-
- /**
- * Reserves files (with the given fileID) in the specified journal, and places a
- * {@link FileWrapperJournal} in place to store messages while synchronization is
going on.
- * @param packet
- * @throws Exception
- */
- private void handleStartReplicationSynchronization(final ReplicationStartSyncMessage
packet) throws Exception
- {
- if (server.isRemoteBackupUpToDate())
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "RemoteBackup
can not be up-to-date!");
- }
-
- if (packet.isSynchronizationFinished())
- {
- finishSynchronization(packet.getNodeID());
- return;
- }
-
- final Journal journal = journalsHolder.get(packet.getJournalContentType());
- synchronized (this)
- {
- if (!started)
- return;
- Map<Long, JournalSyncFile> mapToFill =
filesReservedForSync.get(packet.getJournalContentType());
- log.info("Journal " + packet.getJournalContentType() + ".
Reserving fileIDs for synchronization: " +
- Arrays.toString(packet.getFileIds()));
-
- for (Entry<Long, JournalFile> entry :
journal.createFilesForBackupSync(packet.getFileIds()).entrySet())
- {
- mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
- }
- FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
- registerJournal(packet.getJournalContentType().typeByte, syncJournal);
- }
- }
-
- private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
- {
- LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), true);
-
- if (message != null)
- {
- try
- {
- message.deleteFile();
- }
- catch (Exception e)
- {
- log.warn("Error deleting large message ID = " +
packet.getMessageId(), e);
- }
- }
- }
-
- /**
- * @param packet
- */
- private void handleLargeMessageWrite(final ReplicationLargeMessageWriteMessage packet)
throws Exception
- {
- LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), false);
- if (message != null)
- {
- message.addBytes(packet.getBody());
- }
- }
-
- /**
- * @param request
- */
- private void handleCompareDataMessage(final ReplicationCompareDataMessage request)
throws HornetQException
- {
- compareJournalInformation(request.getJournalInformation());
- }
-
- private LargeServerMessage lookupLargeMessage(final long messageId, final boolean
delete)
- {
- LargeServerMessage message;
-
- if (delete)
- {
- message = largeMessages.remove(messageId);
- }
- else
- {
- message = largeMessages.get(messageId);
- if (message == null)
- {
- synchronized (largeMessages)
- {
- if (!server.isRemoteBackupUpToDate())
- {
- // in case we need to append data to a file while still sync'ing
the backup
- createLargeMessage(messageId, true);
- message = largeMessages.get(messageId);
- }
- }
- }
- }
-
- if (message == null)
- {
- log.warn("Large MessageID " + messageId +
- " is not available on backup server.
Ignoring replication message");
- }
-
- return message;
-
- }
-
- /**
- * @param packet
- */
- private void handleLargeMessageBegin(final ReplicationLargeMessageBeingMessage
packet)
- {
- final long id = packet.getMessageId();
- createLargeMessage(id, false);
- log.trace("Receiving Large Message " + id + " on backup");
- }
-
- private void createLargeMessage(final long id, boolean sync)
- {
- LargeServerMessage msg = storage.createLargeMessage();
- msg.setDurable(true);
- msg.setMessageID(id);
- msg.setReplicationSync(sync);
- largeMessages.put(id, msg);
- }
-
- /**
- * @param packet
- */
- private void handleCommitRollback(final ReplicationCommitMessage packet) throws
Exception
- {
- Journal journalToUse = getJournal(packet.getJournalID());
-
- if (packet.isRollback())
- {
- journalToUse.appendRollbackRecord(packet.getTxId(), packet.getSync());
- }
- else
- {
- journalToUse.appendCommitRecord(packet.getTxId(), packet.getSync());
- }
- }
-
- /**
- * @param packet
- */
- private void handlePrepare(final ReplicationPrepareMessage packet) throws Exception
- {
- Journal journalToUse = getJournal(packet.getJournalID());
-
- journalToUse.appendPrepareRecord(packet.getTxId(), packet.getRecordData(), false);
- }
-
- /**
- * @param packet
- */
- private void handleAppendDeleteTX(final ReplicationDeleteTXMessage packet) throws
Exception
- {
- Journal journalToUse = getJournal(packet.getJournalID());
-
- journalToUse.appendDeleteRecordTransactional(packet.getTxId(), packet.getId(),
packet.getRecordData());
- }
-
- /**
- * @param packet
- */
- private void handleAppendDelete(final ReplicationDeleteMessage packet) throws
Exception
- {
- Journal journalToUse = getJournal(packet.getJournalID());
-
- journalToUse.appendDeleteRecord(packet.getId(), false);
- }
-
- /**
- * @param packet
- */
- private void handleAppendAddTXRecord(final ReplicationAddTXMessage packet) throws
Exception
- {
- Journal journalToUse = getJournal(packet.getJournalID());
-
- if (packet.isUpdate())
- {
- journalToUse.appendUpdateRecordTransactional(packet.getTxId(),
- packet.getId(),
- packet.getRecordType(),
- packet.getRecordData());
- }
- else
- {
- journalToUse.appendAddRecordTransactional(packet.getTxId(),
- packet.getId(),
- packet.getRecordType(),
- packet.getRecordData());
- }
- }
-
- /**
- * @param packet
- * @throws Exception
- */
- private void handleAppendAddRecord(final ReplicationAddMessage packet) throws
Exception
- {
- Journal journalToUse = getJournal(packet.getJournalID());
-
- if (packet.isUpdate())
- {
- if (ReplicationEndpointImpl.trace)
- {
- log.trace("Endpoint appendUpdate id = " + packet.getId());
- }
- journalToUse.appendUpdateRecord(packet.getId(), packet.getRecordType(),
packet.getRecordData(), false);
- }
- else
- {
- if (ReplicationEndpointImpl.trace)
- {
- log.trace("Endpoint append id = " + packet.getId());
- }
- journalToUse.appendAddRecord(packet.getId(), packet.getRecordType(),
packet.getRecordData(), false);
- }
- }
-
- /**
- * @param packet
- */
- private void handlePageEvent(final ReplicationPageEventMessage packet) throws
Exception
- {
- ConcurrentMap<Integer, Page> pages = getPageMap(packet.getStoreName());
-
- Page page = pages.remove(packet.getPageNumber());
-
- if (page == null)
- {
- page = getPage(packet.getStoreName(), packet.getPageNumber());
- }
-
- if (page != null)
- {
- if (packet.isDelete())
- {
- if (deletePages)
- {
- page.delete(null);
- }
- }
- else
- {
- page.close();
- }
- }
-
- }
-
- /**
- * @param packet
- */
- private void handlePageWrite(final ReplicationPageWriteMessage packet) throws
Exception
- {
- PagedMessage pgdMessage = packet.getPagedMessage();
- pgdMessage.initMessage(storage);
- ServerMessage msg = pgdMessage.getMessage();
- Page page = getPage(msg.getAddress(), packet.getPageNumber());
- page.write(pgdMessage);
- }
-
- private ConcurrentMap<Integer, Page> getPageMap(final SimpleString storeName)
- {
- ConcurrentMap<Integer, Page> resultIndex = pageIndex.get(storeName);
-
- if (resultIndex == null)
- {
- resultIndex = new ConcurrentHashMap<Integer, Page>();
- ConcurrentMap<Integer, Page> mapResult = pageIndex.putIfAbsent(storeName,
resultIndex);
- if (mapResult != null)
- {
- resultIndex = mapResult;
- }
- }
-
- return resultIndex;
- }
-
- private Page getPage(final SimpleString storeName, final int pageId) throws Exception
- {
- ConcurrentMap<Integer, Page> map = getPageMap(storeName);
-
- Page page = map.get(pageId);
-
- if (page == null)
- {
- page = newPage(pageId, storeName, map);
- }
-
- return page;
- }
-
- /**
- * @param pageId
- * @param map
- * @return
- */
- private synchronized Page newPage(final int pageId,
- final SimpleString storeName,
- final ConcurrentMap<Integer, Page> map) throws
Exception
- {
- Page page = map.get(pageId);
-
- if (page == null)
- {
- page = pageManager.getPageStore(storeName).createPage(pageId);
- page.open();
- map.put(pageId, page);
- }
-
- return page;
- }
-
- /**
- * @param journalID
- * @return
- */
- private Journal getJournal(final byte journalID)
- {
- return journals[journalID];
- }
-
- public static class JournalSyncFile
- {
-
- private FileChannel channel;
- private final File file;
-
- public JournalSyncFile(JournalFile jFile) throws Exception
- {
- SequentialFile seqFile = jFile.getFile();
- file = seqFile.getJavaFile();
- seqFile.close();
- }
-
- FileChannel getChannel() throws Exception
- {
- if (channel == null)
- {
- channel = new FileOutputStream(file).getChannel();
- }
- return channel;
- }
-
- void close() throws IOException
- {
- if (channel != null)
- channel.close();
- }
-
- @Override
- public String toString()
- {
- return "JournalSyncFile(file=" + file.getAbsolutePath() +
")";
- }
- }
-
- @Override
- public void setQuorumManager(QuorumManager quorumManager)
- {
- this.quorumManager = quorumManager;
- }
-}
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -98,7 +98,7 @@
public ReplicationManagerImpl(CoreRemotingConnection remotingConnection, final
ExecutorFactory executorFactory)
{
this.executorFactory = executorFactory;
- replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
+ this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id,
-1);
this.remotingConnection = remotingConnection;
}
@@ -110,9 +110,7 @@
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecord(byte, long,
byte, org.hornetq.core.journal.EncodingSupport, boolean)
- */
+ @Override
public void appendUpdateRecord(final byte journalID,
final long id,
final byte recordType,
@@ -124,9 +122,7 @@
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecord(byte, long,
boolean)
- */
+ @Override
public void appendDeleteRecord(final byte journalID, final long id) throws Exception
{
if (enabled)
@@ -585,10 +581,13 @@
}
@Override
- public void sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType)
throws HornetQException
+ public
+ void
+ sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType,
String nodeID)
+
throws HornetQException
{
if (enabled)
- sendReplicatePacket(new ReplicationStartSyncMessage(datafiles, contentType));
+ sendReplicatePacket(new ReplicationStartSyncMessage(datafiles, contentType,
nodeID));
}
@Override
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -17,6 +17,7 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
@@ -47,19 +48,28 @@
import org.hornetq.utils.ExecutorFactory;
/**
- * This interface defines the internal interface of the HornetQ Server exposed to other
components of the server. The
- * external management interface of the HornetQ Server is defined by the
HornetQServerManagement interface This
- * interface is never exposed outside the HornetQ server, e.g. by JMX or other means
- *
+ * This interface defines the internal interface of the HornetQ Server exposed to other
components
+ * of the server.
+ * <p>
+ * The external management interface of the HornetQ Server is defined by the
HornetQServerManagement
+ * interface.
+ * <p>
+ * This interface is never exposed outside the HornetQ server, e.g. by JMX or other
means
* @author <a href="tim.fox(a)jboss.com">Tim Fox</a>
* @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
*/
public interface HornetQServer extends HornetQComponent
{
- /** This method was created mainly for testing but it may be used in scenarios where
- * you need to have more than one Server inside the same VM.
- * This identity will be exposed on logs what may help you to debug issues on the log
traces and debugs.*/
+ /**
+ * Sets the server identity.
+ * <p>
+ * The identity will be exposed on logs. It may help to debug issues on the log traces
and
+ * debugs.
+ * <p>
+ * This method was created mainly for testing but it may be used in scenarios where
you need to
+ * have more than one Server inside the same VM.
+ */
void setIdentity(String identity);
String getIdentity();
@@ -84,9 +94,7 @@
/**
* Returns the resource to manage this HornetQ server.
- *
- * Using this control will throw IllegalStateException if the
- * server is not properly started.
+ * @throws IllegalStateException if the server is not properly started.
*/
HornetQServerControlImpl getHornetQServerControl();
@@ -110,8 +118,6 @@
Set<ServerSession> getSessions();
- boolean isStarted();
-
boolean isStopped();
HierarchicalRepository<Set<Role>> getSecurityRepository();
@@ -185,8 +191,8 @@
* @param rc
* @param pair
* @param clusterConnection
- * @return {@code true} if replication started successfully, {@code false} otherwise
+ * @throws HornetQException
*/
- boolean startReplication(CoreRemotingConnection rc, ClusterConnection
clusterConnection,
- Pair<TransportConfiguration, TransportConfiguration> pair);
+ void startReplication(CoreRemotingConnection rc, ClusterConnection clusterConnection,
+ Pair<TransportConfiguration, TransportConfiguration> pair)
throws HornetQException;
}
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/ClusterConnection.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/ClusterConnection.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -27,7 +27,7 @@
* A ClusterConnection
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
+ *
* Created 23 Jan 2009 14:51:55
*
*
@@ -37,32 +37,40 @@
SimpleString getName();
String getNodeID();
-
+
HornetQServer getServer();
-
+
void nodeAnnounced(long eventUID, String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean backup);
void addClusterTopologyListener(ClusterTopologyListener listener, boolean
clusterConnection);
-
+
void removeClusterTopologyListener(ClusterTopologyListener listener, boolean
clusterConnection);
-
+
/**
* @return a Map of node ID and addresses
*/
Map<String, String> getNodes();
void activate() throws Exception;
-
+
TransportConfiguration getConnector();
-
+
Topology getTopology();
-
+
void flushExecutor();
// for debug
String describe();
void informTopology();
-
+
void announceBackup();
+
+ /**
+ * Verifies whether user and password match the ones configured for this
ClusterConnection.
+ * @param clusterUser
+ * @param clusterPassword
+ * @return
+ */
+ boolean verify(String clusterUser, String clusterPassword);
}
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -1439,4 +1439,10 @@
return str.toString();
}
+
+ @Override
+ public boolean verify(String clusterUser0, String clusterPassword0)
+ {
+ return clusterUser.equals(clusterUser0) &&
clusterPassword.equals(clusterPassword0);
+ }
}
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -377,7 +377,8 @@
log.warn("No connector with name '" + config.getConnectorName()
+ "'. backup cannot be announced.");
return;
}
- liveChannel.send(new BackupRegistrationMessage(nodeUUID.toString(),
connector));
+ liveChannel.send(new BackupRegistrationMessage(nodeUUID.toString(), connector,
configuration.getClusterUser(),
+
configuration.getClusterPassword()));
}
else
{
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -88,11 +88,11 @@
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
+import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.replication.ReplicationManager;
-import org.hornetq.core.replication.impl.ReplicationEndpointImpl;
import org.hornetq.core.replication.impl.ReplicationManagerImpl;
import org.hornetq.core.security.CheckType;
import org.hornetq.core.security.Role;
@@ -227,6 +227,7 @@
private final Object initialiseLock = new Object();
private boolean initialised;
private final Object startUpLock = new Object();
+ private final Object replicationLock = new Object();
/**
* Only applicable to 'remote backup servers'. If this flag is false the
backup may not become
@@ -333,6 +334,13 @@
public synchronized void start() throws Exception
{
+ if (started)
+ {
+ log.debug("Server already started!");
+ return;
+ }
+
+ log.debug("Starting server " + this);
OperationContextImpl.clearContext();
try
@@ -393,7 +401,7 @@
{
assert replicationEndpoint == null;
backupUpToDate = false;
- replicationEndpoint = new ReplicationEndpointImpl(this,
shutdownOnCriticalIO);
+ replicationEndpoint = new ReplicationEndpoint(this,
shutdownOnCriticalIO);
activation = new SharedNothingBackupActivation();
}
@@ -545,6 +553,12 @@
pagingManager.stop();
}
+ if (replicationEndpoint != null)
+ {
+ replicationEndpoint.stop();
+ replicationEndpoint = null;
+ }
+
if (!criticalIOError && storageManager != null)
{
storageManager.stop();
@@ -556,12 +570,6 @@
replicationManager = null;
}
- if (replicationEndpoint != null)
- {
- replicationEndpoint.stop();
- replicationEndpoint = null;
- }
-
if (securityManager != null)
{
securityManager.stop();
@@ -577,36 +585,39 @@
postOffice.stop();
}
- List<Runnable> tasks = scheduledPool.shutdownNow();
+ if (scheduledPool != null)
+ {
+ List<Runnable> tasks = scheduledPool.shutdownNow();
+ for (Runnable task : tasks)
+ {
+ HornetQServerImpl.log.info(this + "::Waiting for " + task);
+ }
+ }
- for (Runnable task : tasks)
- {
- HornetQServerImpl.log.info(this + "::Waiting for " + task);
- }
-
if (memoryManager != null)
{
memoryManager.stop();
}
- threadPool.shutdown();
- scheduledPool.shutdown();
-
- try
- {
- if (!threadPool.awaitTermination(10, TimeUnit.SECONDS))
+ if (threadPool != null)
{
- HornetQServerImpl.log.warn("Timed out waiting for pool to
terminate");
+ threadPool.shutdown();
+ try
+ {
+ if (!threadPool.awaitTermination(10, TimeUnit.SECONDS))
+ {
+ HornetQServerImpl.log.warn("Timed out waiting for pool to
terminate");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore
+ }
+ threadPool = null;
}
- }
- catch (InterruptedException e)
- {
- // Ignore
- }
- threadPool = null;
- try
+ try
{
if (!scheduledPool.awaitTermination(10, TimeUnit.SECONDS))
{
@@ -621,7 +632,6 @@
securityStore.stop();
threadPool = null;
-
scheduledPool = null;
pagingManager = null;
@@ -1199,7 +1209,7 @@
{
if (configuration.isPersistenceEnabled())
{
- return new JournalStorageManager(configuration, executorFactory,
replicationManager, shutdownOnCriticalIO);
+ return new JournalStorageManager(configuration, executorFactory,
shutdownOnCriticalIO);
}
else
{
@@ -1285,7 +1295,7 @@
if (ConfigurationImpl.DEFAULT_CLUSTER_USER.equals(configuration.getClusterUser())
&&
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD.equals(configuration.getClusterPassword()))
{
- log.warn("Security risk! It has been detected that the cluster admin user
and password " + "have not been changed from the installation default. "
+ log.warn("Security risk! HornetQ is running with the default cluster admin
user and default password. "
+ "Please see the HornetQ user guide, cluster chapter, for
instructions on how to do this.");
}
@@ -2020,7 +2030,7 @@
private final class SharedNothingBackupActivation implements Activation
{
- private ServerLocatorInternal serverLocator;
+ private ServerLocatorInternal serverLocator0;
private volatile boolean failedConnection;
public void run()
@@ -2040,12 +2050,12 @@
clusterManager.start();
final TransportConfiguration config =
configuration.getConnectorConfigurations().get(liveConnectorName);
- serverLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(config);
- final QuorumManager quorumManager = new QuorumManager(serverLocator);
+ serverLocator0 =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(config);
+ final QuorumManager quorumManager = new QuorumManager(serverLocator0);
replicationEndpoint.setQuorumManager(quorumManager);
- serverLocator.setReconnectAttempts(-1);
-
+ serverLocator0.setReconnectAttempts(-1);
+ serverLocator0.addInterceptor(new ReplicationError(HornetQServerImpl.this));
threadPool.execute(new Runnable()
{
@Override
@@ -2053,7 +2063,7 @@
{
try
{
- final ClientSessionFactory liveServerSessionFactory =
serverLocator.connect();
+ final ClientSessionFactory liveServerSessionFactory =
serverLocator0.connect();
if (liveServerSessionFactory == null)
{
// XXX HORNETQ-768
@@ -2062,7 +2072,6 @@
CoreRemotingConnection liveConnection =
liveServerSessionFactory.getConnection();
Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id,
-1);
Channel replicationChannel =
liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
-
connectToReplicationEndpoint(replicationChannel);
replicationEndpoint.start();
clusterManager.announceReplicatingBackup(pingChannel);
@@ -2109,7 +2118,7 @@
}
}
- serverLocator.close();
+ serverLocator0.close();
replicationEndpoint.stop();
if (failedConnection)
@@ -2146,10 +2155,10 @@
public void close(final boolean permanently) throws Exception
{
- if (serverLocator != null)
+ if (serverLocator0 != null)
{
- serverLocator.close();
- serverLocator = null;
+ serverLocator0.close();
+ serverLocator0 = null;
}
if (configuration.isBackup())
@@ -2256,33 +2265,68 @@
@Override
- public boolean startReplication(CoreRemotingConnection rc, ClusterConnection
clusterConnection,
- Pair<TransportConfiguration, TransportConfiguration> pair)
+ public void startReplication(CoreRemotingConnection rc, ClusterConnection
clusterConnection,
+ Pair<TransportConfiguration, TransportConfiguration>
pair) throws HornetQException
{
if (replicationManager != null)
{
- return false;
+ throw new HornetQException(HornetQException.ALREADY_REPLICATING);
}
- replicationManager = new ReplicationManagerImpl(rc, executorFactory);
- try
+ if (!isStarted())
{
- replicationManager.start();
- storageManager.startReplication(replicationManager, pagingManager,
getNodeID().toString(), clusterConnection,
- pair);
- return true;
+ throw new IllegalStateException();
}
- catch (Exception e)
+
+ synchronized (replicationLock)
{
- /*
- * The reasoning here is that the exception was either caused by (1) the
(interaction with)
- * the backup, or (2) by an IO Error at the storage. If (1), we can swallow the
exception
- * and ignore the replication request. If (2) the live will crash shortly.
- */
- // HORNETQ-720 Need to verify whether swallowing the exception here is
acceptable
- log.warn("Exception when trying to start replication", e);
- replicationManager = null;
- return false;
+
+ if (replicationManager != null)
+ {
+ throw new HornetQException(HornetQException.ALREADY_REPLICATING);
+ }
+
+ rc.addFailureListener(new ReplicationFailureListener());
+ replicationManager = new ReplicationManagerImpl(rc, executorFactory);
+
+ try
+ {
+ replicationManager.start();
+ storageManager.startReplication(replicationManager, pagingManager,
getNodeID().toString(),
+ clusterConnection, pair);
+ }
+ catch (Exception e)
+ {
+ /*
+ * The reasoning here is that the exception was either caused by (1) the
(interaction
+ * with) the backup, or (2) by an IO Error at the storage. If (1), we can
swallow the
+ * exception and ignore the replication request. If (2) the live will crash
shortly.
+ */
+ log.warn("Exception when trying to start replication", e);
+
+ try
+ {
+ if (replicationManager != null)
+ replicationManager.stop();
+ }
+ catch (Exception hqe)
+ {
+ log.warn("Exception while trying to close replicationManager",
hqe);
+ }
+ finally
+ {
+ replicationManager = null;
+ }
+
+ if (e instanceof HornetQException)
+ {
+ throw (HornetQException)e;
+ }
+ else
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Error
trying to start replication", e);
+ }
+ }
}
}
@@ -2303,4 +2347,28 @@
nodeManager.setNodeID(nodeID);
backupUpToDate = true;
}
+
+ private final class ReplicationFailureListener implements FailureListener
+ {
+
+ @Override
+ public void connectionFailed(HornetQException exception, boolean failedOver)
+ {
+ Executors.newSingleThreadExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ synchronized (replicationLock)
+
+ {
+ if (replicationManager != null)
+ {
+ storageManager.stopReplication();
+ }
+ }
+ }
+ });
+ }
+ }
+
}
Copied:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/impl/ReplicationError.java
(from rev 11800,
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ReplicationError.java)
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/impl/ReplicationError.java
(rev 0)
+++
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/server/impl/ReplicationError.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -0,0 +1,50 @@
+/**
+ *
+ */
+package org.hornetq.core.server.impl;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+
+/**
+ * Stops the backup in case of an error at the start of Replication.
+ * <p>
+ * Using an interceptor for the task to avoid a server reference inside of the
'basic' channel-0
+ * handler at {@link ClientSessionFactoryImpl#Channel0Handler}. As {@link
ClientSessionFactoryImpl}
+ * is also shipped in the HQ-client JAR (which does not include {@link HornetQServer}).
+ */
+final class ReplicationError implements Interceptor
+{
+ private final HornetQServer server;
+ private static final Logger log = Logger.getLogger(ReplicationError.class);
+
+ public ReplicationError(HornetQServer server)
+ {
+ this.server = server;
+ }
+
+ @Override
+ public boolean intercept(Packet packet, RemotingConnection connection) throws
HornetQException
+ {
+ if (packet.getType() != PacketImpl.BACKUP_REGISTRATION_FAILED)
+ return true;
+ log.warn("Failed to register as backup. Stopping the server.");
+ try
+ {
+ server.stop();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "error trying
to stop " + server, e);
+ }
+
+ return false;
+ }
+
+}
Modified:
branches/HORNETQ-316/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
---
branches/HORNETQ-316/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -88,14 +88,7 @@
{
for (ServerLocator locator : locators)
{
- try
- {
- locator.close();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
+ closeServerLocator(locator);
}
locators.clear();
super.tearDown();
@@ -122,6 +115,20 @@
}
}
+ public static final void closeSessionFactory(final ClientSessionFactory sf)
+ {
+ if (sf == null)
+ return;
+ try
+ {
+ sf.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
protected void waitForTopology(final HornetQServer server, final int nodes) throws
Exception
{
waitForTopology(server, nodes, WAIT_TIMEOUT);
@@ -708,9 +715,9 @@
}
}
-
- protected final void receiveMessagesAndAck(ClientConsumer consumer, final int start,
int msgCount)
- throws HornetQException
+ protected final void
+ receiveMessages(ClientConsumer consumer, final int start, final int msgCount,
final boolean ack)
+
throws HornetQException
{
for (int i = start; i < msgCount; i++)
{
@@ -718,7 +725,8 @@
Assert.assertNotNull("Expecting a message " + i, message);
assertMessageBody(i, message);
Assert.assertEquals(i, message.getIntProperty("counter").intValue());
- message.acknowledge();
+ if (ack)
+ message.acknowledge();
}
}
Modified:
branches/HORNETQ-316/hornetq-jms/src/main/java/org/hornetq/jms/persistence/JMSStorageManager.java
===================================================================
---
branches/HORNETQ-316/hornetq-jms/src/main/java/org/hornetq/jms/persistence/JMSStorageManager.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-jms/src/main/java/org/hornetq/jms/persistence/JMSStorageManager.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -41,21 +41,21 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
+
void load() throws Exception;
-
+
void storeDestination(PersistedDestination destination) throws Exception;
void deleteDestination(PersistedType type, String name) throws Exception;
-
+
List<PersistedDestination> recoverDestinations();
-
+
void deleteConnectionFactory(String connectionFactory) throws Exception;
-
+
void storeConnectionFactory(PersistedConnectionFactory connectionFactory) throws
Exception;
-
+
List<PersistedConnectionFactory> recoverConnectionFactories();
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -65,17 +65,17 @@
// Inner classes -------------------------------------------------
void addJNDI(PersistedType type, String name, String ... address) throws Exception;
-
+
List<PersistedJNDI> recoverPersistedJNDI() throws Exception;
-
+
void deleteJNDI(PersistedType type, String name, String address) throws Exception;
-
+
void deleteJNDI(PersistedType type, String name) throws Exception;
/**
* Add the journal here to the replication endpoint
* @param replicationEndpoint
- * @throws Exception
+ * @throws Exception
*/
void installReplication(ReplicationEndpoint replicationEndpoint) throws Exception;
}
Modified:
branches/HORNETQ-316/hornetq-jms/src/main/java/org/hornetq/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java
===================================================================
---
branches/HORNETQ-316/hornetq-jms/src/main/java/org/hornetq/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-jms/src/main/java/org/hornetq/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -55,27 +55,27 @@
private final byte CF_RECORD = 1;
private final byte DESTINATION_RECORD = 2;
-
+
private final byte JNDI_RECORD = 3;
-
+
// Attributes ----------------------------------------------------
private final IDGenerator idGenerator;
-
+
private final String journalDir;
-
+
private final boolean createDir;
-
+
private final Journal jmsJournal;
private volatile boolean started;
-
- private Map<String, PersistedConnectionFactory> mapFactories = new
ConcurrentHashMap<String, PersistedConnectionFactory>();
- private Map<Pair<PersistedType, String>, PersistedDestination>
destinations = new ConcurrentHashMap<Pair<PersistedType, String>,
PersistedDestination>();
-
- private Map<Pair<PersistedType, String>, PersistedJNDI> mapJNDI = new
ConcurrentHashMap<Pair<PersistedType, String>, PersistedJNDI>();
+ private final Map<String, PersistedConnectionFactory> mapFactories = new
ConcurrentHashMap<String, PersistedConnectionFactory>();
+ private final Map<Pair<PersistedType, String>, PersistedDestination>
destinations = new ConcurrentHashMap<Pair<PersistedType, String>,
PersistedDestination>();
+
+ private final Map<Pair<PersistedType, String>, PersistedJNDI> mapJNDI =
new ConcurrentHashMap<Pair<PersistedType, String>, PersistedJNDI>();
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -117,7 +117,7 @@
{
jmsJournal = localJMS;
}
-
+
this.idGenerator = idGenerator;
}
@@ -144,7 +144,7 @@
jmsJournal.appendAddRecord(id, CF_RECORD, connectionFactory, true);
mapFactories.put(connectionFactory.getName(), connectionFactory);
}
-
+
public void deleteConnectionFactory(final String cfName) throws Exception
{
PersistedConnectionFactory oldCF = mapFactories.remove(cfName);
@@ -152,7 +152,7 @@
{
jmsJournal.appendDeleteRecord(oldCF.getId(), false);
}
-
+
this.deleteJNDI(PersistedType.ConnectionFactory, cfName);
}
@@ -177,22 +177,22 @@
jmsJournal.appendAddRecord(id, DESTINATION_RECORD, destination, true);
destinations.put(new Pair<PersistedType, String>(destination.getType(),
destination.getName()), destination);
}
-
+
public List<PersistedJNDI> recoverPersistedJNDI() throws Exception
{
ArrayList<PersistedJNDI> list = new ArrayList<PersistedJNDI>();
-
+
list.addAll(mapJNDI.values());
-
+
return list;
}
-
+
public void addJNDI(PersistedType type, String name, String ... address) throws
Exception
{
Pair<PersistedType, String> key = new Pair<PersistedType, String>(type,
name);
long tx = idGenerator.generateID();
-
+
PersistedJNDI currentJNDI = mapJNDI.get(key);
if (currentJNDI != null)
{
@@ -202,30 +202,30 @@
{
currentJNDI = new PersistedJNDI(type, name);
}
-
+
mapJNDI.put(key, currentJNDI);
-
+
for (String adItem : address)
{
currentJNDI.addJNDI(adItem);
}
-
+
long newId = idGenerator.generateID();
-
+
currentJNDI.setId(newId);
-
+
jmsJournal.appendAddRecordTransactional(tx, newId, JNDI_RECORD, currentJNDI);
-
+
jmsJournal.appendCommitRecord(tx, true);
}
-
+
public void deleteJNDI(PersistedType type, String name, String address) throws
Exception
{
Pair<PersistedType, String> key = new Pair<PersistedType, String>(type,
name);
long tx = idGenerator.generateID();
-
+
PersistedJNDI currentJNDI = mapJNDI.get(key);
if (currentJNDI == null)
{
@@ -235,9 +235,9 @@
{
jmsJournal.appendDeleteRecordTransactional(tx, currentJNDI.getId());
}
-
+
currentJNDI.deleteJNDI(address);
-
+
if (currentJNDI.getJndi().size() == 0)
{
mapJNDI.remove(key);
@@ -248,15 +248,15 @@
currentJNDI.setId(newId);
jmsJournal.appendAddRecordTransactional(tx, newId, JNDI_RECORD, currentJNDI);
}
-
+
jmsJournal.appendCommitRecord(tx, true);
}
-
+
public void deleteJNDI(PersistedType type, String name) throws Exception
{
Pair<PersistedType, String> key = new Pair<PersistedType, String>(type,
name);
-
+
PersistedJNDI currentJNDI = mapJNDI.remove(key);
if (currentJNDI != null)
@@ -293,25 +293,21 @@
checkAndCreateDir(journalDir, createDir);
jmsJournal.start();
-
+
started = true;
}
- /* (non-Javadoc)
- * @see
org.hornetq.jms.persistence.JMSStorageManager#installReplication(org.hornetq.core.replication.ReplicationEndpoint)
- */
+ @Override
public void installReplication(ReplicationEndpoint replicationEndpoint) throws
Exception
{
jmsJournal.loadInternalOnly();
replicationEndpoint.registerJournal((byte)2, this.jmsJournal);
}
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#stop()
- */
+
+ @Override
public void stop() throws Exception
{
this.started = false;
@@ -321,13 +317,13 @@
public void load() throws Exception
{
mapFactories.clear();
-
+
List<RecordInfo> data = new ArrayList<RecordInfo>();
-
+
ArrayList<PreparedTransactionInfo> list = new
ArrayList<PreparedTransactionInfo>();
-
+
jmsJournal.load(data, list, null);
-
+
for (RecordInfo record : data)
{
long id = record.id;
@@ -335,7 +331,7 @@
HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(record.data);
byte rec = record.getUserRecordType();
-
+
if (rec == CF_RECORD)
{
PersistedConnectionFactory cf = new PersistedConnectionFactory();
@@ -362,9 +358,9 @@
{
throw new IllegalStateException("Invalid record type " + rec);
}
-
+
}
-
+
}
// Package protected ---------------------------------------------
Modified:
branches/HORNETQ-316/hornetq-jms/src/main/java/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java
===================================================================
---
branches/HORNETQ-316/hornetq-jms/src/main/java/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-jms/src/main/java/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -38,7 +38,7 @@
*/
public void deleteConnectionFactory(String connectionFactory) throws Exception
{
-
+
}
/* (non-Javadoc)
Modified:
branches/HORNETQ-316/hornetq-jms/src/main/java/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
===================================================================
---
branches/HORNETQ-316/hornetq-jms/src/main/java/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-jms/src/main/java/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -13,17 +13,13 @@
package org.hornetq.jms.server.recovery;
-import java.util.Map;
-
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.logging.Logger;
@@ -58,7 +54,7 @@
private XAResource delegate;
- private XARecoveryConfig[] xaRecoveryConfigs;
+ private final XARecoveryConfig[] xaRecoveryConfigs;
//private TransportConfiguration currentConnection;
Modified:
branches/HORNETQ-316/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
---
branches/HORNETQ-316/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -123,6 +123,7 @@
@Override
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport
record) throws Exception
{
+ count(txID);
JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id,
record);
writeRecord(deleteRecordTX, false, null);
}
Modified:
branches/HORNETQ-316/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/HORNETQ-316/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -2008,7 +2008,6 @@
if (replicationSync)
{
assert filesRepository.getDataFiles().isEmpty();
- filesRepository.ensureMinFiles();
setJournalState(JournalState.SYNCING);
return new JournalLoadInformation(0, -1);
}
Modified:
branches/HORNETQ-316/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
===================================================================
---
branches/HORNETQ-316/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQRAManagedConnectionFactory.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-ra/hornetq-ra-jar/src/main/java/org/hornetq/ra/HornetQRAManagedConnectionFactory.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -15,7 +15,6 @@
import java.io.PrintWriter;
import java.util.Iterator;
-import java.util.List;
import java.util.Set;
import javax.jms.ConnectionMetaData;
Modified:
branches/HORNETQ-316/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/JMSTest.java
===================================================================
---
branches/HORNETQ-316/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/JMSTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/JMSTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -1,18 +1,11 @@
package org.hornetq.rest.test;
-import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.client.HornetQDestination;
-import org.hornetq.jms.client.HornetQJMSConnectionFactory;
-import org.hornetq.rest.HttpHeaderProperty;
-import org.hornetq.rest.Jms;
-import org.hornetq.rest.queue.QueueDeployment;
-import org.jboss.resteasy.client.ClientRequest;
-import org.jboss.resteasy.client.ClientResponse;
-import org.jboss.resteasy.spi.Link;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import static org.jboss.resteasy.test.TestPortProvider.generateURL;
+import java.io.Serializable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
@@ -23,11 +16,18 @@
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.xml.bind.annotation.XmlRootElement;
-import java.io.Serializable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import static org.jboss.resteasy.test.TestPortProvider.*;
+import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
+import org.hornetq.rest.HttpHeaderProperty;
+import org.hornetq.rest.Jms;
+import org.hornetq.rest.queue.QueueDeployment;
+import org.jboss.resteasy.client.ClientRequest;
+import org.jboss.resteasy.client.ClientResponse;
+import org.jboss.resteasy.spi.Link;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
/**
* @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
@@ -46,6 +46,7 @@
@XmlRootElement
public static class Order implements Serializable
{
+ private static final long serialVersionUID = 1397854679589606480L;
private String name;
private String amount;
Modified:
branches/HORNETQ-316/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/SelectorTest.java
===================================================================
---
branches/HORNETQ-316/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/SelectorTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/SelectorTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -1,6 +1,19 @@
package org.hornetq.rest.test;
-import org.hornetq.jms.client.HornetQConnectionFactory;
+import static org.jboss.resteasy.test.TestPortProvider.generateURL;
+
+import java.io.Serializable;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.xml.bind.annotation.XmlRootElement;
+
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import org.hornetq.rest.HttpHeaderProperty;
@@ -14,19 +27,6 @@
import org.junit.BeforeClass;
import org.junit.Test;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.xml.bind.annotation.XmlRootElement;
-import java.io.Serializable;
-
-import static org.jboss.resteasy.test.TestPortProvider.*;
-
/**
* @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
* @version $Revision: 1 $
@@ -157,7 +157,7 @@
@Path("two")
public void two(Order order)
{
- twoOrder = order;
+ twoOrder = order;
}
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -79,6 +79,8 @@
public class PagingTest extends ServiceTestBase
{
private ServerLocator locator;
+ private HornetQServer server;
+ private ClientSessionFactory sf;
static final int MESSAGE_SIZE = 1024; // 1k
public PagingTest(final String name)
@@ -120,6 +122,8 @@
@Override
protected void tearDown() throws Exception
{
+ stopComponent(server);
+ closeSessionFactory(sf);
closeServerLocator(locator);
super.tearDown();
}
@@ -132,8 +136,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
- config,
+ server =
+ createServer(true, config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
@@ -146,15 +150,13 @@
final int messagesPerTX = numberOfMessages / numberOfTX;
- try
- {
ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -341,18 +343,6 @@
Thread.sleep(100);
}
assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testReceiveImmediate() throws Exception
@@ -363,7 +353,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -375,15 +366,15 @@
final int numberOfMessages = 1000;
+ ServerLocator locator = createInVMNonHALocator();
try
{
- ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -442,16 +433,12 @@
assertEquals(numberOfMessages, queue.getMessageCount());
- LinkedList<Xid> xids = new LinkedList<Xid>();
-
- int msgReceived = 0;
ClientSession sessionConsumer = sf.createSession(false, false, false);
sessionConsumer.start();
ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
for (int msgCount = 0; msgCount < numberOfMessages; msgCount++)
{
log.info("Received " + msgCount);
- msgReceived++;
ClientMessage msg = consumer.receiveImmediate();
if (msg == null)
{
@@ -487,15 +474,9 @@
}
finally
{
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
+ closeServerLocator(locator);
+ stopComponent(server);
}
-
}
/**
@@ -510,7 +491,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -522,15 +504,13 @@
final int numberOfMessages = 1000;
- try
- {
ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -716,20 +696,6 @@
sessionConsumer.commit();
sessionConsumer.close();
-
-
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testMissingTXEverythingAcked() throws Exception
@@ -740,7 +706,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -764,7 +731,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -922,7 +889,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -946,7 +914,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -1086,7 +1054,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1106,7 +1075,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -1221,7 +1190,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1311,7 +1281,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -1522,7 +1492,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1552,7 +1523,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -1705,7 +1676,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1725,7 +1697,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(null, null, false, true, true, false,
0);
@@ -1855,7 +1827,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1870,7 +1843,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(null, null, false, true, true, false,
0);
@@ -1997,7 +1970,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2014,7 +1988,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
byte[] body = new byte[MESSAGE_SIZE];
@@ -2147,7 +2121,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2164,7 +2139,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
byte[] body = new byte[MESSAGE_SIZE];
@@ -2277,7 +2252,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2302,7 +2278,7 @@
try
{
- final ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
final byte[] body = new byte[MESSAGE_SIZE];
@@ -2408,7 +2384,8 @@
Configuration config = createDefaultConfig();
- final HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_SIZE * 2,
@@ -2429,7 +2406,7 @@
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- final ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
final CountDownLatch ready = new CountDownLatch(1);
@@ -2552,7 +2529,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2571,7 +2549,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(null, null, false, true, true, false,
0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -2685,7 +2663,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2704,7 +2683,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(null, null, false, false, true, false,
0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -2758,7 +2737,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2777,7 +2757,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(null, null, false, false, false, false,
0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -2860,7 +2840,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2877,7 +2858,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(null, null, false, false, false, false,
0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -3004,7 +2985,7 @@
settings.put(PagingTest.ADDRESS.toString(), set);
- HornetQServer server = createServer(true, config, 1024, 10 * 1024, settings);
+ server = createServer(true, config, 1024, 10 * 1024, settings);
server.start();
@@ -3017,7 +2998,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(null, null, false, true, true, false,
0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -3146,7 +3127,7 @@
settings.put(PagingTest.ADDRESS.toString(), set);
- HornetQServer server = createServer(true, config, 1024, 1024 * 1024, settings);
+ server = createServer(true, config, 1024, 1024 * 1024, settings);
server.start();
@@ -3157,7 +3138,7 @@
locator.setAckBatchSize(0);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession();
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -3240,7 +3221,8 @@
int NUMBER_OF_MESSAGES = 2;
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -3255,7 +3237,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(null, null, false, !transacted, true,
false, 0);
for (int i = 0; i < NUMBER_OF_BINDINGS; i++)
@@ -3347,7 +3329,8 @@
{
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -3447,7 +3430,8 @@
{
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -3558,13 +3542,13 @@
addresses.put(PAGED_ADDRESS.toString(), pagedDestination);
- HornetQServer server = createServer(true, configuration, -1, -1, addresses);
+ server = createServer(true, configuration, -1, -1, addresses);
try
{
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, false);
@@ -3667,13 +3651,13 @@
addresses.put(PAGED_ADDRESS_B.toString(), pagedDestinationB);
- HornetQServer server = createServer(true, configuration, -1, -1, addresses);
+ server = createServer(true, configuration, -1, -1, addresses);
try
{
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, false);
@@ -3784,7 +3768,7 @@
config.setJournalSyncNonTransactional(false);
config.setJournalFileSize(10 * 1024 * 1024);
- HornetQServer server = createServer(true, config, 512 * 1024, 1024 * 1024, new
HashMap<String, AddressSettings>());
+ server = createServer(true, config, 512 * 1024, 1024 * 1024, new HashMap<String,
AddressSettings>());
server.start();
@@ -3800,7 +3784,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- final ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(true, true);
@@ -3925,7 +3909,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -3949,7 +3934,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -4050,7 +4035,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -4074,7 +4060,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -4177,7 +4163,7 @@
dla.setDeadLetterAddress(new SimpleString("DLA"));
settings.put(ADDRESS.toString(), dla);
- final HornetQServer server = createServer(true, config, PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX, settings);
+ server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX,
settings);
server.start();
@@ -4394,15 +4380,6 @@
finally
{
session.close();
- sf.close();
- locator.close();
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
}
}
@@ -4422,21 +4399,21 @@
dla.setExpiryAddress(new SimpleString("DLA"));
settings.put(ADDRESS.toString(), dla);
- final HornetQServer server = createServer(true, config, PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX, settings);
+ server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX,
settings);
server.start();
final int messageSize = 20;
+ ServerLocator locator = createInVMNonHALocator();
try
{
- ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, false, false);
@@ -4552,14 +4529,8 @@
}
finally
{
- locator.close();
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
+ closeServerLocator(locator);
+ stopComponent(server);
}
}
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -181,7 +181,7 @@
}
for (int i = 0; i < MAX_SERVERS; i++)
- {
+ {
stopComponent(nodeManagers[i]);
}
UnitTestCase.checkFreePort(ClusterTestBase.PORTS);
@@ -243,6 +243,12 @@
}
}
}
+
+ @Override
+ public String toString()
+ {
+ return "id=" + id + ", consumer=" + consumer + ",
session=" + session;
+ }
}
protected ClientConsumer getConsumer(final int node)
@@ -569,8 +575,6 @@
if (holder != null)
{
holder.close();
- // holder.session.close();
-
consumers[i] = null;
}
}
@@ -580,14 +584,8 @@
{
for (int i = 0; i < sfs.length; i++)
{
- ClientSessionFactory sf = sfs[i];
-
- if (sf != null)
- {
- sf.close();
-
- sfs[i] = null;
- }
+ closeSessionFactory(sfs[i]);
+ sfs[i] = null;
}
}
@@ -595,14 +593,8 @@
{
for (int i = 0; i < locators.length; i++)
{
- ServerLocator sf = locators[i];
-
- if (sf != null)
- {
- sf.close();
-
- locators[i] = null;
- }
+ closeServerLocator(locators[i]);
+ locators[i] = null;
}
}
@@ -1237,7 +1229,7 @@
Assert.assertTrue(counts.contains(messageCount));
}
- LinkedList[] lists = new LinkedList[consumerIDs.length];
+ List<LinkedList<Integer>> lists = new
ArrayList<LinkedList<Integer>>(consumerIDs.length);
for (int i = 0; i < messageCounts.length; i++)
{
@@ -1247,7 +1239,7 @@
if (elem == messageCounts[i])
{
- lists[i] = list;
+ lists.set(i, list);
break;
}
@@ -1257,17 +1249,17 @@
for (int messageCount : messageCounts)
{
- LinkedList list = lists[index];
+ LinkedList<Integer> list = lists.get(index);
Assert.assertNotNull(list);
- int elem = (Integer)list.poll();
+ int elem = list.poll();
Assert.assertEquals(messageCount, elem);
index++;
- if (index == lists.length)
+ if (index == consumerIDs.length)
{
index = 0;
}
Copied:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupAuthenticationTest.java
(from rev 11800,
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupAuthenticationTest.java)
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupAuthenticationTest.java
(rev 0)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupAuthenticationTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -0,0 +1,78 @@
+/**
+ *
+ */
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.util.TransportConfigurationUtils;
+
+public class BackupAuthenticationTest extends FailoverTestBase
+{
+ private static CountDownLatch latch;
+ @Override
+ public void setUp() throws Exception
+ {
+ startBackupServer = false;
+ latch = new CountDownLatch(1);
+ super.setUp();
+ }
+
+ public void testPasswordSetting() throws Exception
+ {
+ waitForServer(liveServer.getServer());
+ backupServer.start();
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ /*
+ * can't intercept the message at the backup, so we intercept the registration
message at the
+ * live.
+ */
+ Thread.sleep(2000);
+ assertFalse("backup should have stopped", backupServer.isStarted());
+ backupConfig.setClusterPassword(CLUSTER_PASSWORD);
+ backupServer.start();
+ waitForRemoteBackup(null, 5, true, backupServer.getServer());
+ }
+
+ @Override
+ protected void createConfigs() throws Exception
+ {
+ createReplicatedConfigs();
+ backupConfig.setClusterPassword("crocodile");
+
liveConfig.setInterceptorClassNames(Arrays.asList(NotifyingInterceptor.class.getName()));
+ }
+
+ @Override
+ protected TransportConfiguration getAcceptorTransportConfiguration(boolean live)
+ {
+ return TransportConfigurationUtils.getInVMAcceptor(live);
+ }
+
+ @Override
+ protected TransportConfiguration getConnectorTransportConfiguration(boolean live)
+ {
+ return TransportConfigurationUtils.getInVMConnector(live);
+ }
+
+ public static final class NotifyingInterceptor implements Interceptor
+ {
+
+ @Override
+ public boolean intercept(Packet packet, RemotingConnection connection) throws
HornetQException
+ {
+ if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
+ {
+ latch.countDown();
+ }
+ return true;
+ }
+ }
+}
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -43,6 +43,7 @@
syncDelay = new BackupSyncDelay(backupServer, liveServer);
}
+
public void testNodeID() throws Exception
{
startBackupFinishSyncing();
@@ -85,8 +86,7 @@
finishSyncAndFailover();
JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
- System.out.println("backup journal " + backupMsgJournal);
- System.out.println("live journal " + messageJournal);
+
assertEquals("file sizes must be the same", size,
backupMsgJournal.getFileSize());
Set<Long> backupIds = getFileIds(backupMsgJournal);
assertEquals("File IDs must match!", liveIds, backupIds);
@@ -182,7 +182,7 @@
{
session.start();
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
- receiveMessagesAndAck(consumer, start, end);
+ receiveMessages(consumer, start, end, true);
consumer.close();
session.commit();
}
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -35,9 +35,13 @@
public void testDeleteLargeMessages() throws Exception
{
+ File dir = new
File(backupServer.getServer().getConfiguration().getLargeMessagesDirectory());
+ assertEquals("Should not have any large messages... previous test failed to
clean up?", 0,
+ getAllMessageFileIds(dir).size());
createProducerSendSomeMessages();
startBackupFinishSyncing();
- File dir = new
File(backupServer.getServer().getConfiguration().getLargeMessagesDirectory());
+ // File dir = new
+ // File(backupServer.getServer().getConfiguration().getLargeMessagesDirectory());
receiveMsgsInRange(0, n_msgs / 2);
assertEquals("we really ought to delete these after delivery", n_msgs /
2, getAllMessageFileIds(dir).size());
}
@@ -45,11 +49,15 @@
private Set<Long> getAllMessageFileIds(File dir)
{
Set<Long> idsOnBkp = new HashSet<Long>();
- for (String filename : dir.list())
+ String[] fileList = dir.list();
+ if (fileList != null)
{
- if (filename.endsWith(".msg"))
+ for (String filename : fileList)
{
- idsOnBkp.add(Long.valueOf(filename.split("\\.")[0]));
+ if (filename.endsWith(".msg"))
+ {
+ idsOnBkp.add(Long.valueOf(filename.split("\\.")[0]));
+ }
}
}
return idsOnBkp;
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -56,9 +56,6 @@
protected void setUp() throws Exception
{
super.setUp();
-
- //FailoverManagerImpl.enableDebug();
-
setupServers();
}
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -22,7 +22,6 @@
package org.hornetq.tests.integration.cluster.failover;
-import org.hornetq.core.logging.Logger;
/**
* A DiscoveryClusterWithBackupFailoverTest
@@ -33,7 +32,6 @@
*/
public class DiscoveryClusterWithBackupFailoverTest extends
ClusterWithBackupFailoverTestBase
{
- private static final Logger log =
Logger.getLogger(DiscoveryClusterWithBackupFailoverTest.class);
protected final String groupAddress = getUDPDiscoveryAddress();
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -254,7 +254,7 @@
ClusterConnectionConfiguration cccLive = new
ClusterConnectionConfiguration("cluster1", "jms",
backupConnector.getName(), -1, false, false, 1, 1,
dgb, false);
backupConfig.getClusterConfigurations().add(cccLive);
- backupServer = createBackupServer();
+ backupServer = createServer(backupConfig);
liveConfig = super.createDefaultConfig();
liveConfig.getAcceptorConfigurations().clear();
@@ -272,7 +272,7 @@
liveConfig.getClusterConfigurations().add(ccc0);
liveConfig.getConnectorConfigurations().put(liveConnector.getName(),
liveConnector);
liveConfig.getConnectorConfigurations().put(backupConnector.getName(),
backupConnector);
- liveServer = createLiveServer();
+ liveServer = createServer(liveConfig);
}
@Override
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -165,7 +165,7 @@
dgb, false);
backupConfig.getClusterConfigurations().add(cccLive);
backupConfig.setAllowAutoFailBack(false);
- backupServer = createBackupServer();
+ backupServer = createServer(backupConfig);
liveConfig = super.createDefaultConfig();
liveConfig.getAcceptorConfigurations().clear();
@@ -183,7 +183,7 @@
liveConfig.getConnectorConfigurations().put(liveConnector.getName(),
liveConnector);
liveConfig.getConnectorConfigurations().put(backupConnector.getName(),
backupConnector);
liveConfig.setAllowAutoFailBack(false);
- liveServer = createLiveServer();
+ liveServer = createServer(liveConfig);
}
@Override
@@ -251,6 +251,7 @@
* @param message
* @throws Exception
*/
+ @Override
protected void setBody(final int i, final ClientMessage message) throws Exception
{
message.getBodyBuffer().writeString("message" + i);
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -41,7 +41,6 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.integration.cluster.util.TestableServer;
@@ -58,7 +57,6 @@
*/
public class FailoverTest extends FailoverTestBase
{
- private static final Logger log = Logger.getLogger(FailoverTest.class);
private static final int NUM_MESSAGES = 100;
private ServerLocator locator;
@@ -206,7 +204,6 @@
session.close();
Assert.assertTrue(retry <= 5);
- closeSessionFactory();
}
public void testNonTransacted() throws Exception
@@ -231,8 +228,6 @@
receiveDurableMessages(consumer);
session.close();
-
- closeSessionFactory();
}
private void createSessionFactory() throws Exception
@@ -379,7 +374,7 @@
session.close();
- closeSessionFactory();
+
}
/**
@@ -430,12 +425,10 @@
message = consumer.receiveImmediate();
- Assert.assertNotNull(message);
+ Assert.assertNotNull("expecting a message", message);
Assert.assertEquals(counter,
message.getIntProperty("counter").intValue());
session.close();
-
- closeSessionFactory();
}
public void testTransactedMessagesNotSentSoNoRollback() throws Exception
@@ -472,7 +465,7 @@
session.close();
- closeSessionFactory();
+
}
public void testTransactedMessagesWithConsumerStartedBeforeFailover() throws
Exception
@@ -516,8 +509,6 @@
session.commit();
session.close();
-
- closeSessionFactory();
}
public void testTransactedMessagesConsumedSoRollback() throws Exception
@@ -560,8 +551,6 @@
session1.close();
session2.close();
-
- closeSessionFactory();
}
public void testTransactedMessagesNotConsumedSoNoRollback() throws Exception
@@ -638,8 +627,6 @@
session1.close();
session2.close();
-
- closeSessionFactory();
}
public void testXAMessagesSentSoRollbackOnEnd() throws Exception
@@ -682,8 +669,6 @@
Assert.assertNull(message);
session.close();
-
- closeSessionFactory();
}
public void testXAMessagesSentSoRollbackOnPrepare() throws Exception
@@ -731,8 +716,6 @@
producer.close();
consumer.close();
session.close();
-
- closeSessionFactory();
}
// This might happen if 1PC optimisation kicks in
@@ -823,8 +806,6 @@
session.commit(xid2, false);
session.close();
-
- closeSessionFactory();
}
public void testXAMessagesConsumedSoRollbackOnEnd() throws Exception
@@ -869,8 +850,6 @@
session1.close();
session2.close();
-
- closeSessionFactory();
}
public void testXAMessagesConsumedSoRollbackOnPrepare() throws Exception
@@ -917,8 +896,6 @@
session1.close();
session2.close();
-
- closeSessionFactory();
}
// 1PC optimisation
@@ -968,8 +945,6 @@
session1.close();
session2.close();
-
- closeSessionFactory();
}
public void testCreateNewFactoryAfterFailover() throws Exception
@@ -992,8 +967,6 @@
session = sendAndConsume(sf, true);
session.close();
-
- closeSessionFactory();
}
public void testFailoverMultipleSessionsWithConsumers() throws Exception
@@ -1057,8 +1030,6 @@
}
sendSession.close();
-
- closeSessionFactory();
}
/*
@@ -1079,24 +1050,13 @@
session.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer, 0, NUM_MESSAGES, false);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
- }
-
crash(session);
receiveDurableMessages(consumer);
session.close();
-
- closeSessionFactory();
}
private void sendMessagesSomeDurable(ClientSession session, ClientProducer producer)
throws Exception, HornetQException
@@ -1147,8 +1107,6 @@
receiveDurableMessages(consumer);
session.close();
-
- closeSessionFactory();
}
private void receiveDurableMessages(ClientConsumer consumer) throws HornetQException
@@ -1211,16 +1169,14 @@
}
// Should get the same ones after failover since we didn't ack
- receiveMessagesAndAck(consumer, NUM_MESSAGES, NUM_MESSAGES * 2);
+ receiveMessages(consumer, NUM_MESSAGES, NUM_MESSAGES * 2, true);
session.close();
-
- closeSessionFactory();
}
private void receiveMessages(ClientConsumer consumer) throws HornetQException
{
- receiveMessagesAndAck(consumer, 0, NUM_MESSAGES);
+ receiveMessages(consumer, 0, NUM_MESSAGES, true);
}
public void testSimpleSendAfterFailoverDurableTemporary() throws Exception
@@ -1275,8 +1231,6 @@
receiveMessages(consumer);
session.close();
-
- closeSessionFactory();
}
public void _testForceBlockingReturn() throws Exception
@@ -1334,7 +1288,6 @@
Assert.assertEquals(sender.e.getCode(), HornetQException.UNBLOCKED);
session.close();
-
}
public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception
@@ -1344,6 +1297,7 @@
locator.setReconnectAttempts(-1);
locator.setBlockOnAcknowledge(true);
+
sf = createSessionFactoryAndWaitForTopology(locator, 2);
final ClientSession session = createSession(sf, false, false);
@@ -1402,7 +1356,7 @@
}
catch (HornetQException e2)
{
-
+ throw new RuntimeException(e2);
}
}
@@ -1428,7 +1382,7 @@
committer.join();
- Assert.assertFalse(committer.failed);
+ Assert.assertFalse("second attempt succeed?", committer.failed);
session.close();
@@ -1459,16 +1413,17 @@
try
{
session2.commit();
+ fail("expecting DUPLICATE_ID_REJECTED exception");
}
catch (HornetQException e)
{
- assertEquals(HornetQException.DUPLICATE_ID_REJECTED, e.getCode());
+ assertEquals(e.getMessage(), HornetQException.DUPLICATE_ID_REJECTED,
e.getCode());
}
- ClientConsumer consumer = session2.createConsumer(FailoverTestBase.ADDRESS);
session2.start();
+ ClientConsumer consumer = session2.createConsumer(FailoverTestBase.ADDRESS);
receiveMessages(consumer);
ClientMessage message = consumer.receiveImmediate();
@@ -1476,15 +1431,13 @@
Assert.assertNull(message);
session2.close();
-
- closeSessionFactory();
}
private void closeSessionFactory()
{
if (sf == null)
return;
- sf.close();
+ closeSessionFactory(sf);
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
}
@@ -1724,7 +1677,7 @@
session.close();
- closeSessionFactory();
+
}
public void testLiveAndBackupBackupComesBackNewFactory() throws Exception
@@ -1747,6 +1700,12 @@
// To reload security or other settings that are read during startup
beforeRestart(backupServer);
+ if (!backupServer.getServer().getConfiguration().isSharedStore())
+ {
+ // this test would not make sense in the remote replication use case, without
the following
+ backupServer.getServer().getConfiguration().setBackup(false);
+ }
+
backupServer.start();
assertTrue("session failure listener", listener.getLatch().await(5,
TimeUnit.SECONDS));
@@ -1779,7 +1738,7 @@
session.close();
- closeSessionFactory();
+
}
// Package protected ---------------------------------------------
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -16,6 +16,7 @@
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -81,6 +82,7 @@
protected NodeManager nodeManager;
protected boolean startBackupServer = true;
+ private final Collection<ServerLocator> serverLocators = new
ArrayList<ServerLocator>();
// Static --------------------------------------------------------
@@ -128,16 +130,12 @@
}
}
- protected TestableServer createLiveServer()
+ protected TestableServer createServer(Configuration config)
{
- return new SameProcessHornetQServer(createInVMFailoverServer(true, liveConfig,
nodeManager, 1));
+ return new SameProcessHornetQServer(
+ createInVMFailoverServer(true, config,
nodeManager, config.isBackup() ? 2 : 1));
}
- protected TestableServer createBackupServer()
- {
- return new SameProcessHornetQServer(createInVMFailoverServer(true, backupConfig,
nodeManager, 2));
- }
-
/**
* Large message version of {@link #setBody(int, ClientMessage)}.
* @param i
@@ -167,7 +165,7 @@
for (int j = 0; j < LARGE_MESSAGE_SIZE; j++)
{
- Assert.assertTrue("expecting " + LARGE_MESSAGE_SIZE + " bytes,
got " + j, buffer.readable());
+ Assert.assertTrue("msg " + i + ", expecting " +
LARGE_MESSAGE_SIZE + " bytes, got " + j, buffer.readable());
Assert.assertEquals("equal at " + j, UnitTestCase.getSamplebyte(j),
buffer.readByte());
}
}
@@ -189,7 +187,7 @@
backupConfig.getConnectorConfigurations().put(backupConnector.getName(),
backupConnector);
ReplicatedBackupUtils.createClusterConnectionConf(backupConfig,
backupConnector.getName(),
liveConnector.getName());
- backupServer = createBackupServer();
+ backupServer = createServer(backupConfig);
liveConfig = super.createDefaultConfig();
liveConfig.getAcceptorConfigurations().clear();
@@ -199,11 +197,13 @@
liveConfig.setClustered(true);
ReplicatedBackupUtils.createClusterConnectionConf(liveConfig,
liveConnector.getName());
liveConfig.getConnectorConfigurations().put(liveConnector.getName(),
liveConnector);
- liveServer = createLiveServer();
+ liveServer = createServer(liveConfig);
}
- protected void createReplicatedConfigs() throws Exception
+ protected void createReplicatedConfigs()
{
+ nodeManager = new InVMNodeManager();
+
final TransportConfiguration liveConnector =
getConnectorTransportConfiguration(true);
final TransportConfiguration backupConnector =
getConnectorTransportConfiguration(false);
final TransportConfiguration backupAcceptor =
getAcceptorTransportConfiguration(false);
@@ -215,18 +215,18 @@
ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector,
backupAcceptor, liveConfig,
liveConnector);
- backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() +
"_backup");
- backupConfig.setJournalDirectory(backupConfig.getJournalDirectory() +
"_backup");
- backupConfig.setPagingDirectory(backupConfig.getPagingDirectory() +
"_backup");
- backupConfig.setLargeMessagesDirectory(backupConfig.getLargeMessagesDirectory() +
"_backup");
+ final String sufix = "_backup";
+ backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() + sufix);
+ backupConfig.setJournalDirectory(backupConfig.getJournalDirectory() + sufix);
+ backupConfig.setPagingDirectory(backupConfig.getPagingDirectory() + sufix);
+ backupConfig.setLargeMessagesDirectory(backupConfig.getLargeMessagesDirectory() +
sufix);
backupConfig.setSecurityEnabled(false);
- backupServer = createBackupServer();
+ backupServer = createServer(backupConfig);
liveConfig.getAcceptorConfigurations().clear();
liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
-
- liveServer = createLiveServer();
+ liveServer = createServer(liveConfig);
}
@Override
@@ -236,6 +236,15 @@
stopComponent(backupServer);
stopComponent(liveServer);
+ synchronized (serverLocators)
+ {
+ for (ServerLocator locator : serverLocators)
+ {
+ closeServerLocator(locator);
+ }
+ serverLocators.clear();
+ }
+
Assert.assertEquals(0, InVMRegistry.instance.size());
backupServer = null;
@@ -373,6 +382,10 @@
protected ServerLocatorInternal getServerLocator() throws Exception
{
ServerLocator locator =
HornetQClient.createServerLocatorWithHA(getConnectorTransportConfiguration(true),
getConnectorTransportConfiguration(false));
+ synchronized (serverLocators)
+ {
+ serverLocators.add(locator);
+ }
return (ServerLocatorInternal) locator;
}
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -20,6 +20,7 @@
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
+
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
@@ -32,7 +33,6 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.jms.client.HornetQTextMessage;
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -36,6 +36,8 @@
public class MultipleLivesMultipleBackupsFailoverTest extends
MultipleBackupsFailoverTestBase
{
protected Map<Integer, TestableServer> servers = new HashMap<Integer,
TestableServer>();
+ private ServerLocator locator2;
+ private ServerLocator locator;
@Override
protected void tearDown() throws Exception
@@ -54,6 +56,8 @@
}
}
}
+ closeServerLocator(locator);
+ closeServerLocator(locator2);
super.tearDown();
}
@@ -73,20 +77,20 @@
servers.get(3).start();
waitForServer(servers.get(3).getServer());
-
+
servers.get(1).start();
waitForServer(servers.get(1).getServer());
servers.get(2).start();
-
+
servers.get(4).start();
waitForServer(servers.get(4).getServer());
-
+
servers.get(5).start();
waitForServer(servers.get(4).getServer());
- ServerLocator locator = getServerLocator(0);
+ locator = getServerLocator(0);
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
@@ -101,7 +105,7 @@
int liveAfter0 = waitForNewLive(10000, true, servers, 1, 2);
- ServerLocator locator2 = getServerLocator(3);
+ locator2 = getServerLocator(3);
locator2.setBlockOnNonDurableSend(true);
locator2.setBlockOnDurableSend(true);
locator2.setBlockOnAcknowledge(true);
@@ -254,6 +258,7 @@
new SameProcessHornetQServer(createInVMFailoverServer(true, config0,
nodeManager, liveNode)));
}
+ @Override
protected boolean isNetty()
{
return false;
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyFailoverTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyFailoverTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyFailoverTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -29,20 +29,6 @@
public class NettyFailoverTest extends FailoverTest
{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean
live)
{
@@ -70,9 +56,4 @@
return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
}
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -13,6 +13,7 @@
package org.hornetq.tests.integration.cluster.failover;
+import org.hornetq.core.config.Configuration;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.TestableServer;
@@ -24,40 +25,15 @@
public class NettyReplicatedFailoverTest extends NettyFailoverTest
{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
@Override
- protected TestableServer createLiveServer()
+ protected TestableServer createServer(Configuration config)
{
- return new SameProcessHornetQServer(createServer(true, liveConfig));
+ return new SameProcessHornetQServer(createServer(true, config));
}
@Override
- protected TestableServer createBackupServer()
- {
- return new SameProcessHornetQServer(createServer(true, backupConfig));
- }
-
- @Override
protected void createConfigs() throws Exception
{
createReplicatedConfigs();
}
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -220,17 +220,11 @@
}
@Override
- protected TestableServer createBackupServer()
+ protected TestableServer createServer(Configuration config)
{
- return new SameProcessHornetQServer(createServer(true, backupConfig));
+ return new SameProcessHornetQServer(createServer(true, config));
}
- @Override
- protected TestableServer createLiveServer()
- {
- return new SameProcessHornetQServer(createServer(true, liveConfig));
- }
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Deleted:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/RemoteFailoverTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/RemoteFailoverTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/RemoteFailoverTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -1,142 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.cluster.failover;
-
-import java.util.ArrayList;
-import java.util.Map;
-
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.config.ClusterConnectionConfiguration;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
-import org.hornetq.core.server.HornetQServers;
-import org.hornetq.core.server.JournalType;
-import org.hornetq.tests.integration.cluster.util.RemoteProcessHornetQServer;
-import org.hornetq.tests.integration.cluster.util.RemoteServerConfiguration;
-import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
-import org.hornetq.tests.integration.cluster.util.TestableServer;
-
-/**
- * A RemoteFailoverTest
- *
- * @author jmesnil
- *
- *
- */
-public class RemoteFailoverTest extends FailoverTest
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- public class SharedLiveServerConfiguration extends RemoteServerConfiguration
- {
-
- @Override
- public Configuration getConfiguration()
- {
- Configuration config = createDefaultConfig(generateParams(0, true),
NettyAcceptorFactory.class.getName());
- config.setJournalType(JournalType.NIO);
- config.setSharedStore(true);
- config.setClustered(true);
- config.getConnectorConfigurations().put("self",
- createTransportConfiguration(true,
false, generateParams(0, true)));
- config.getClusterConfigurations().add(new
ClusterConnectionConfiguration("cluster",
-
"foo",
-
"self",
- -1,
- false,
- false,
- 1,
- 1,
-
createStaticDiscoveryGroupConfiguration(new TransportConfiguration[0]),
-
false));
- return config;
- }
-
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- super.tearDown();
- //just to make sure
- if (liveServer != null)
- {
- try
- {
- liveServer.destroy();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- }
-
- @Override
- protected TestableServer createLiveServer()
- {
- return new
RemoteProcessHornetQServer(SharedLiveServerConfiguration.class.getName());
- }
-
- @Override
- protected TestableServer createBackupServer()
- {
- return new
SameProcessHornetQServer(HornetQServers.newHornetQServer(backupConfig));
- }
-
- protected TransportConfiguration getConnectorTransportConfiguration(final boolean
live) {
- Map<String, Object> params = null;
- if (live)
- {
- params = generateParams(0, true);
- } else
- {
- params = generateParams(1, true);
- }
- return createTransportConfiguration(true, false, params);
- }
-
- @Override
- protected TransportConfiguration getAcceptorTransportConfiguration(boolean live)
- {
- Map<String, Object> params = null;
- if (live)
- {
- params = generateParams(0, true);
- } else
- {
- params = generateParams(1, true);
- }
- return createTransportConfiguration(true, true, params);
- }
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/RemoteMultipleLivesMultipleBackupsFailoverTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -22,7 +22,6 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.server.JournalType;
import org.hornetq.tests.integration.cluster.util.RemoteProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.RemoteServerConfiguration;
@@ -69,6 +68,7 @@
backups.put(5, SharedBackupServerConfiguration5.class.getName());
}
+ @Override
protected boolean isNetty()
{
return true;
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -21,8 +21,8 @@
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.JournalType;
@@ -38,6 +38,7 @@
// Attributes ----------------------------------------------------
private static Map<Integer, String> backups = new HashMap<Integer,
String>();
+ private ClientSessionFactoryInternal sf;
// Static --------------------------------------------------------
@@ -59,13 +60,13 @@
servers.get(2).start();
servers.get(3).start();
- ServerLocator locator = getServerLocator(0);
+ locator = (ServerLocatorInternal)getServerLocator(0);
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
int backupNode;
ClientSession session = sendAndConsume(sf, true);
System.out.println("failing live node ");
@@ -95,10 +96,6 @@
backupNode = waitForNewLive(5, true, servers, 0);
assertEquals(0, backupNode);
session = sendAndConsume(sf, false);
-
- locator.close();
-
- servers.get(0).stop();
}
// Package protected ---------------------------------------------
@@ -121,12 +118,11 @@
@Override
protected void tearDown() throws Exception
{
- super.tearDown();
- // make sure
for (TestableServer testableServer : servers.values())
{
try
{
+ stopComponent(testableServer);
testableServer.destroy();
}
catch (Exception e)
@@ -134,7 +130,20 @@
e.printStackTrace(); // To change body of catch statement use File | Settings
| File Templates.
}
}
+ servers.clear();
+ try
+ {
+ if (sf != null)
+ {
+ sf.close();
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ super.tearDown();
}
@Override
@@ -303,7 +312,6 @@
config1.getClusterConfigurations().add(ccc1);
config1.getConnectorConfigurations().put(backupConnector.getName(),
backupConnector);
- System.out.println(config1.getBindingsDirectory());
config1.setBindingsDirectory(config1.getBindingsDirectory() + "_" +
liveNode);
config1.setJournalDirectory(config1.getJournalDirectory() + "_" +
liveNode);
config1.setPagingDirectory(config1.getPagingDirectory() + "_" +
liveNode);
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedAsynchronousFailoverTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedAsynchronousFailoverTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedAsynchronousFailoverTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -13,8 +13,6 @@
package org.hornetq.tests.integration.cluster.failover;
-import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
-import org.hornetq.tests.integration.cluster.util.TestableServer;
/**
* A ReplicatedAsynchronousFailoverTest
@@ -23,20 +21,7 @@
*/
public class ReplicatedAsynchronousFailoverTest extends AsynchronousFailoverTest
{
-
@Override
- protected TestableServer createLiveServer()
- {
- return new SameProcessHornetQServer(createInVMFailoverServer(true, liveConfig,
nodeManager, 1));
- }
-
- @Override
- protected TestableServer createBackupServer()
- {
- return new SameProcessHornetQServer(createInVMFailoverServer(true, backupConfig,
nodeManager, 2));
- }
-
- @Override
protected void createConfigs() throws Exception
{
createReplicatedConfigs();
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -66,7 +66,7 @@
Assert.assertNotNull(msg);
- System.out.println(i + " msg = " + msg);
+ // System.out.println(i + " msg = " + msg);
int received = msg.getIntProperty("key");
@@ -101,7 +101,7 @@
Assert.assertNotNull(msg);
- System.out.println(i + " msg = " + msg);
+ // System.out.println(i + " msg = " + msg);
int received = (Integer)msg.getObjectProperty(new
SimpleString("key"));
@@ -131,7 +131,7 @@
Assert.assertNotNull(msg);
- System.out.println(i + " msg = " + msg);
+ // System.out.println(i + " msg = " + msg);
int received = msg.getIntProperty("key");
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageFailoverTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageFailoverTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageFailoverTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -13,7 +13,6 @@
package org.hornetq.tests.integration.cluster.failover;
-import org.hornetq.core.logging.Logger;
/**
* A ReplicatedLargeMessageFailoverTest
@@ -24,17 +23,7 @@
*/
public class ReplicatedLargeMessageFailoverTest extends LargeMessageFailoverTest
{
- private static final Logger log =
Logger.getLogger(ReplicatedLargeMessageFailoverTest.class);
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
-
public ReplicatedLargeMessageFailoverTest()
{
super();
@@ -45,19 +34,9 @@
super(name);
}
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
@Override
protected void createConfigs() throws Exception
{
createReplicatedConfigs();
}
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageWithDelayFailoverTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageWithDelayFailoverTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageWithDelayFailoverTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -21,6 +21,14 @@
protected void crash(ClientSession... sessions) throws Exception
{
syncDelay.deliverUpToDateMsg();
+ waitForBackup(null, 5);
super.crash(sessions);
}
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ syncDelay.deliverUpToDateMsg();
+ super.tearDown();
+ }
}
Copied:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java
(from rev 11800,
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java)
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java
(rev 0)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -0,0 +1,19 @@
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.HashMap;
+
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.settings.impl.AddressSettings;
+
+public class ReplicatedPagedFailoverTest extends ReplicatedFailoverTest
+{
+ @Override
+ protected HornetQServer createInVMFailoverServer(final boolean realFiles, final
Configuration configuration,
+ final NodeManager nodeManager, int id)
+ {
+ return createInVMFailoverServer(realFiles, configuration, PAGE_SIZE, PAGE_MAX,
+ new HashMap<String, AddressSettings>(),
nodeManager, id);
+ }
+}
Deleted:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagedFailverOverTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagedFailverOverTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedPagedFailverOverTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -1,19 +0,0 @@
-package org.hornetq.tests.integration.cluster.failover;
-
-import java.util.HashMap;
-
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.NodeManager;
-import org.hornetq.core.settings.impl.AddressSettings;
-
-public class ReplicatedPagedFailverOverTest extends ReplicatedFailoverTest
-{
- @Override
- protected HornetQServer createInVMFailoverServer(final boolean realFiles, final
Configuration configuration,
- final NodeManager nodeManager, int id)
- {
- return createInVMFailoverServer(realFiles, configuration, PAGE_SIZE, PAGE_MAX,
- new HashMap<String, AddressSettings>(),
nodeManager, id);
- }
-}
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -23,7 +23,6 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.core.config.ClusterConnectionConfiguration;
-import org.hornetq.core.config.Configuration;
import org.hornetq.core.security.Role;
import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.spi.core.security.HornetQSecurityManager;
@@ -68,6 +67,7 @@
ackBatchSize);
}
+ @Override
protected ClientSession createSession(ClientSessionFactory sf,
boolean autoCommitSends,
boolean autoCommitAcks,
@@ -77,16 +77,19 @@
.isPreAcknowledge(), ackBatchSize);
}
+ @Override
protected ClientSession createSession(ClientSessionFactory sf, boolean
autoCommitSends, boolean autoCommitAcks) throws Exception
{
return createSession(sf, autoCommitSends, autoCommitAcks,
sf.getServerLocator().getAckBatchSize());
}
+ @Override
protected ClientSession createSession(ClientSessionFactory sf) throws Exception
{
return createSession(sf, true, true, sf.getServerLocator().getAckBatchSize());
}
+ @Override
protected ClientSession createSession(ClientSessionFactory sf,
boolean xa,
boolean autoCommitSends,
@@ -98,6 +101,7 @@
/**
* @throws Exception
*/
+ @Override
protected void createConfigs() throws Exception
{
nodeManager = new InVMNodeManager();
@@ -128,7 +132,7 @@
dgb,
false);
backupConfig.getClusterConfigurations().add(cccLive);
- backupServer = createBackupServer();
+ backupServer = createServer(backupConfig);
HornetQSecurityManager securityManager = installSecurity(backupServer);
@@ -154,11 +158,12 @@
false);
liveConfig.getClusterConfigurations().add(ccc0);
liveConfig.getConnectorConfigurations().put(liveConnector.getName(),
liveConnector);
- liveServer = createLiveServer();
+ liveServer = createServer(liveConfig);
installSecurity(liveServer);
}
+ @Override
protected void beforeRestart(TestableServer server)
{
installSecurity(server);
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -17,7 +17,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.TransportConfiguration;
@@ -40,6 +39,7 @@
{
protected Map<Integer, TestableServer> servers = new HashMap<Integer,
TestableServer>();
+ protected ServerLocatorInternal locator;
private NodeManager nodeManager;
Logger log = Logger.getLogger(SingleLiveMultipleBackupsFailoverTest.class);
@@ -73,7 +73,7 @@
servers.get(4).start();
servers.get(5).start();
- ServerLocatorInternal locator = (ServerLocatorInternal)getServerLocator(0);
+ locator = (ServerLocatorInternal)getServerLocator(0);
Topology topology = locator.getTopology();
@@ -194,11 +194,11 @@
@Override
protected void tearDown() throws Exception
{
- for (Entry<Integer, TestableServer> entry : servers.entrySet())
+ for (TestableServer server : servers.values())
{
try
{
- entry.getValue().stop();
+ stopComponent(server);
}
catch (Exception e)
{
@@ -206,6 +206,7 @@
}
}
servers.clear();
+ closeServerLocator(locator);
super.tearDown();
}
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -16,12 +16,11 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServers;
/**
- *
+ *
* A MultiThreadRandomReattachTest
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
@@ -43,9 +42,6 @@
liveServer.start();
}
- /* (non-Javadoc)
- * @see
org.hornetq.tests.integration.cluster.failover.MultiThreadRandomReattachTestBase#setBody(org.hornetq.api.core.client.ClientMessage)
- */
@Override
protected void setBody(final ClientMessage message) throws Exception
{
@@ -53,9 +49,6 @@
message.getBodyBuffer().writeBytes(new byte[250]);
}
- /* (non-Javadoc)
- * @see
org.hornetq.tests.integration.cluster.failover.MultiThreadRandomReattachTestBase#checkSize(org.hornetq.api.core.client.ClientMessage)
- */
@Override
protected boolean checkSize(final ClientMessage message)
{
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -45,8 +45,8 @@
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
- *
*
+ *
*/
public abstract class MultiThreadRandomReattachTestBase extends
MultiThreadReattachSupport
{
@@ -276,7 +276,7 @@
ClientSession session = sf.createSession(false, false, false);
session.addMetaData("someData", RandomUtil.randomString());
session.addMetaData("someData2", RandomUtil.randomString());
-
+
return session;
}
@@ -828,8 +828,8 @@
ClientSession s = sf.createSession(false, false, false);
s.addMetaData("data", RandomUtil.randomString());
-
+
final int numMessages = 100;
final int numSessions = 10;
@@ -907,8 +907,8 @@
ClientSession sessConsume = sf.createSession(false, false, false);
sessConsume.addMetaData("data", RandomUtil.randomString());
-
+
sessConsume.start();
sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
@@ -1159,7 +1159,7 @@
for (int i = 0; i < numSessions; i++)
{
ClientSession session = sf.createSession(false, false, false);
-
+
session.addMetaData("data", RandomUtil.randomString());
session.close();
@@ -1246,7 +1246,7 @@
{
return 2;
}
-
+
protected int getNumThreads()
{
return 10;
@@ -1263,10 +1263,7 @@
@Override
protected void tearDown() throws Exception
{
- if (liveServer != null && liveServer.isStarted())
- {
- liveServer.stop();
- }
+ stopComponent(liveServer);
liveServer = null;
@@ -1306,7 +1303,7 @@
@Override
protected void stop() throws Exception
{
- liveServer.stop();
+ stopComponent(liveServer);
System.gc();
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -70,7 +70,7 @@
public void destroy()
{
- if(serverProcess != null)
+ if (serverProcess != null)
{
serverProcess.destroy();
}
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -19,7 +19,6 @@
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
import org.hornetq.tests.util.CountDownSessionFailureListener;
@@ -31,8 +30,6 @@
*/
public class SameProcessHornetQServer implements TestableServer
{
- private static Logger log = Logger.getLogger(SameProcessHornetQServer.class);
-
private final HornetQServer server;
public SameProcessHornetQServer(HornetQServer server)
@@ -103,7 +100,8 @@
{
// Wait to be informed of failure
boolean ok = listener.getLatch().await(10000, TimeUnit.MILLISECONDS);
- Assert.assertTrue("Failed to stop the server! Latch count is " +
listener.getLatch().getCount(), ok);
+ Assert.assertTrue("Failed to stop the server! Latch count is " +
listener.getLatch().getCount() + " out of " +
+ sessions.length, ok);
}
}
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -1177,7 +1177,6 @@
conf.getAcceptorConfigurations().add(new
TransportConfiguration(InVMAcceptorFactory.class.getName()));
conf.setFileDeploymentEnabled(false);
server = HornetQServers.newHornetQServer(conf, mbeanServer, true);
- server.start();
serverManager = new JMSServerManagerImpl(server);
context = new InVMContext();
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -44,7 +44,6 @@
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.QueueBinding;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
@@ -73,17 +72,13 @@
* A JMSServerControlTest
*
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
- *
+ *
* Created 14 nov. 2008 13:35:10
*
*
*/
public class JMSServerControlTest extends ManagementTestBase
{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(JMSServerControlTest.class);
-
// Attributes ----------------------------------------------------
protected InVMContext context;
@@ -519,10 +514,10 @@
1, // reconnectAttempts
true, // failoverOnInitialConnection
"tst"); // groupID
-
-
+
+
HornetQQueueConnectionFactory cf =
(HornetQQueueConnectionFactory)context.lookup("tst");
-
+
assertEquals(true, cf.isHA());
assertEquals("tst", cf.getClientID());
assertEquals(1, cf.getClientFailureCheckPeriod());
@@ -551,15 +546,15 @@
assertEquals(1, cf.getReconnectAttempts());
assertEquals(true, cf.isFailoverOnInitialConnection());
assertEquals("tst", cf.getGroupID());
-
+
stopServer();
-
+
startServer();
-
+
control = createManagementControl();
-
+
cf = (HornetQQueueConnectionFactory)context.lookup("tst");
-
+
assertEquals(true, cf.isHA());
assertEquals("tst", cf.getClientID());
assertEquals(1, cf.getClientFailureCheckPeriod());
@@ -588,19 +583,19 @@
assertEquals(1, cf.getReconnectAttempts());
assertEquals(true, cf.isFailoverOnInitialConnection());
assertEquals("tst", cf.getGroupID());
-
+
control.destroyConnectionFactory("test");
-
+
ObjectNameBuilder nameBuilder =
ObjectNameBuilder.create(ConfigurationImpl.DEFAULT_JMX_DOMAIN);
assertFalse(mbeanServer.isRegistered(nameBuilder.getConnectionFactoryObjectName("test")));
-
+
stopServer();
-
+
startServer();
-
+
assertFalse(mbeanServer.isRegistered(nameBuilder.getConnectionFactoryObjectName("test")));
-
-
+
+
try
{
cf = (HornetQQueueConnectionFactory)context.lookup("tst");
@@ -609,8 +604,8 @@
catch (NamingException e)
{
}
-
-
+
+
}
public void testListPreparedTransactionDetails() throws Exception
@@ -737,9 +732,9 @@
serverManager.setContext(context);
serverManager.start();
serverManager.activated();
-
+
this.fakeJMSStorageManager = new
FakeJMSStorageManager(serverManager.getJMSStorageManager());
-
+
serverManager.replaceStorageManager(fakeJMSStorageManager);
}
@@ -819,9 +814,9 @@
Map<String, PersistedConnectionFactory> connectionFactoryMap = new
HashMap<String, PersistedConnectionFactory>();
ConcurrentHashMap<String, List<String>> persistedJNDIMap = new
ConcurrentHashMap<String, List<String>>();
-
+
JMSStorageManager delegate;
-
+
public FakeJMSStorageManager(JMSStorageManager delegate)
{
this.delegate = delegate;
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/AcceptorControlTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/AcceptorControlTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/AcceptorControlTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -17,6 +17,7 @@
import junit.framework.Assert;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
@@ -25,10 +26,7 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.AcceptorControl;
import org.hornetq.api.core.management.NotificationType;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
@@ -41,7 +39,7 @@
* A AcceptorControlTest
*
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
- *
+ *
* Created 11 dec. 2008 17:38:58
*
*
@@ -49,13 +47,8 @@
public class AcceptorControlTest extends ManagementTestBase
{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(AcceptorControlTest.class);
-
- // Attributes ----------------------------------------------------
-
private HornetQServer service;
+ private ServerLocator locator;
// Static --------------------------------------------------------
@@ -98,7 +91,9 @@
// started by the server
Assert.assertTrue(acceptorControl.isStarted());
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(InVMConnectorFactory.class.getName()));
+ locator =
+ HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(
+
InVMConnectorFactory.class.getName()));
ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
Assert.assertNotNull(session);
@@ -113,7 +108,7 @@
sf.createSession(false, true, true);
Assert.fail("acceptor must not accept connections when stopped
accepting");
}
- catch (Exception e)
+ catch (HornetQException e)
{
}
@@ -136,7 +131,7 @@
sf.createSession(false, true, true);
Assert.fail("acceptor must not accept connections when stopped
accepting");
}
- catch (Exception e)
+ catch (HornetQException e)
{
}
@@ -186,11 +181,8 @@
@Override
protected void tearDown() throws Exception
{
- if (service != null)
- {
- service.stop();
- }
-
+ stopComponent(service);
+ closeServerLocator(locator);
super.tearDown();
}
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/AcceptorControlUsingCoreTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/AcceptorControlUsingCoreTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/AcceptorControlUsingCoreTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -22,8 +22,6 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.AcceptorControl;
import org.hornetq.api.core.management.ResourceNames;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.tests.util.UnitTestCase;
/**
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/AddressControlTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/AddressControlTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/AddressControlTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -55,7 +55,6 @@
// Attributes ----------------------------------------------------
private HornetQServer server;
-
protected ClientSession session;
private ServerLocator locator;
@@ -104,13 +103,13 @@
session.deleteQueue(anotherQueue);
}
-
+
public void testGetBindingNames() throws Exception
{
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
String divertName = RandomUtil.randomString();
-
+
session.createQueue(address, queue, false);
AddressControl addressControl = createManagementControl(address);
@@ -122,9 +121,9 @@
bindingNames = addressControl.getBindingNames();
Assert.assertEquals(2, bindingNames.length);
-
+
session.deleteQueue(queue);
-
+
bindingNames = addressControl.getBindingNames();
assertEquals(1, bindingNames.length);
assertEquals(divertName.toString(), bindingNames[0]);
@@ -316,12 +315,12 @@
@Override
protected void tearDown() throws Exception
{
- session.close();
+ if (session != null)
+ session.close();
- locator.close();
+ closeServerLocator(locator);
+ stopComponent(server);
- server.stop();
-
server = null;
session = null;
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -29,9 +29,7 @@
import org.hornetq.api.core.management.AddressControl;
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.security.CheckType;
import org.hornetq.core.security.Role;
import org.hornetq.core.server.HornetQServer;
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -28,16 +28,6 @@
public class SecurityManagementWithDefaultConfigurationTest extends
SecurityManagementTestBase
{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
public void testSendManagementMessageWithDefaultClusterAdminUser() throws Exception
{
doSendManagementMessage(ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -63,6 +53,7 @@
protected HornetQServer setupAndStartHornetQServer() throws Exception
{
Configuration conf = createBasicConfig();
+ conf.setClusterPassword(ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
conf.setSecurityEnabled(true);
conf.getAcceptorConfigurations().add(new
TransportConfiguration(InVMAcceptorFactory.class.getName()));
HornetQServer server = HornetQServers.newHornetQServer(conf, false);
Modified:
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -311,10 +311,13 @@
{
TestInterceptor.value.set(false);
if (!session.isClosed())
- session.commit();
+ session.close();
if (!session2.isClosed())
- session2.commit();
+ session2.close();
+ if (sf != null)
+ sf.close();
}
+
}
public void testExceptionSettingActionBefore() throws Exception
@@ -552,23 +555,21 @@
@Override
protected void tearDown() throws Exception
{
-
stopComponent(manager);
manager = null;
+ stopComponent(backupServer);
+ backupServer = null;
stopComponent(liveServer);
liveServer = null;
- stopComponent(backupServer);
- backupServer = null;
+ closeServerLocator(locator);
- executor.shutdown();
+ executor.shutdownNow();
+ scheduledExecutor.shutdownNow();
- scheduledExecutor.shutdown();
-
tFactory = null;
scheduledExecutor = null;
super.tearDown();
-
}
protected
Modified:
branches/HORNETQ-316/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
---
branches/HORNETQ-316/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2011-12-01
18:03:10 UTC (rev 11806)
+++
branches/HORNETQ-316/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2011-12-01
20:07:29 UTC (rev 11807)
@@ -67,6 +67,7 @@
// Public --------------------------------------------------------
+ @Override
protected void setUp() throws Exception
{
super.setUp();
@@ -77,6 +78,7 @@
tmp.mkdirs();
}
+ @Override
protected void tearDown() throws Exception
{
super.tearDown();
@@ -423,7 +425,7 @@
dataRead = input.read(line);
if (dataRead > 0)
{
- System.out.println("Read one line with " + dataRead +
" bytes");
+ // System.out.println("Read one line with " + dataRead +
" bytes");
totalBytes.addAndGet(dataRead);
if (count.incrementAndGet() == 3)
{
@@ -576,9 +578,9 @@
for (int i = 100; i < byteArray.length; i += 10)
{
byte readBytes[] = new byte[10];
-
+
int size = is.read(readBytes);
-
+
for (int j = 0; j < size; j++)
{
assertEquals(getSamplebyte(i + j), readBytes[j]);