Author: clebert.suconic(a)jboss.com
Date: 2009-08-21 17:56:02 -0400 (Fri, 21 Aug 2009)
New Revision: 7879
Added:
trunk/src/main/org/hornetq/core/persistence/impl/journal/BatchingIDGenerator.java
trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java
trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
Modified:
trunk/.settings/org.eclipse.jdt.ui.prefs
trunk/src/main/org/hornetq/core/persistence/StorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java
trunk/tests/src/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java
trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
trunk/tests/src/org/hornetq/tests/integration/server/LVQRecoveryTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
trunk/tests/src/org/hornetq/tests/util/SpawnedVMSupport.java
trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Fixing Batching IDs after restart
Modified: trunk/.settings/org.eclipse.jdt.ui.prefs
===================================================================
--- trunk/.settings/org.eclipse.jdt.ui.prefs 2009-08-21 20:41:47 UTC (rev 7878)
+++ trunk/.settings/org.eclipse.jdt.ui.prefs 2009-08-21 21:56:02 UTC (rev 7879)
@@ -1,4 +1,4 @@
-#Wed Feb 25 14:56:24 CET 2009
+#Fri Aug 21 14:45:31 CDT 2009
cleanup.add_default_serial_version_id=false
cleanup.add_generated_serial_version_id=true
cleanup.add_missing_annotations=true
@@ -60,7 +60,7 @@
org.eclipse.jdt.ui.ondemandthreshold=9999
org.eclipse.jdt.ui.overrideannotation=true
org.eclipse.jdt.ui.staticondemandthreshold=9999
-org.eclipse.jdt.ui.text.custom_code_templates=<?xml version\="1.0"
encoding\="UTF-8"?><templates><template autoinsert\="true"
context\="gettercomment_context" deleted\="false"
description\="Comment for getter method" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.gettercomment"
name\="gettercomment">/**\n * @return the ${bare_field_name}\n
*/</template><template autoinsert\="true"
context\="settercomment_context" deleted\="false"
description\="Comment for setter method" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.settercomment"
name\="settercomment">/**\n * @param ${param} the ${bare_field_name} to set\n
*/</template><template autoinsert\="true"
context\="constructorcomment_context" deleted\="false"
description\="Comment for created constructors" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.constructorcomment"
name\="constructorcomment">/**\n * ${tags}\n */</template><template
autoinsert\="false" context\="filecomment_context" delete!
d\="false" description\="Comment for created Java files"
enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.filecomment"
name\="filecomment"></template><template
autoinsert\="false" context\="typecomment_context"
deleted\="false" description\="Comment for created types"
enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.typecomment"
name\="typecomment">/**\n * A ${type_name}\n *\n * @author ${user}\n *\n *
${tags}\n *\n */</template><template autoinsert\="true"
context\="fieldcomment_context" deleted\="false"
description\="Comment for fields" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.fieldcomment"
name\="fieldcomment">/**\n * \n */</template><template
autoinsert\="true" context\="methodcomment_context"
deleted\="false" description\="Comment for non-overriding methods"
enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.methodcomment"
name\="methodcomment">/**\n * ${tags}\n */</template><template
autoinsert\="true" context\="overrideco!
mment_context" deleted\="false" description\="Comment for over!
riding m
ethods" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.overridecomment"
name\="overridecomment">/* (non-Javadoc)\n * ${see_to_overridden}\n
*/</template><template autoinsert\="true"
context\="delegatecomment_context" deleted\="false"
description\="Comment for delegate methods" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.delegatecomment"
name\="delegatecomment">/**\n * ${tags}\n * ${see_to_target}\n
*/</template><template autoinsert\="false"
context\="newtype_context" deleted\="false" description\="Newly
created files" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.newtype"
name\="newtype">/*\n * JBoss, Home of Professional Open Source\n * Copyright
2005-${year}, Red Hat Middleware LLC, and individual contributors\n * by the @authors tag.
See the copyright.txt in the distribution for a\n * full listing of individual
contributors.\n *\n * This is free software; you can redistribute it and/or modify it\n *
under the terms of the GNU Lesser!
General Public License as\n * published by the Free Software Foundation; either version
2.1 of\n * the License, or (at your option) any later version.\n *\n * This software is
distributed in the hope that it will be useful,\n * but WITHOUT ANY WARRANTY; without even
the implied warranty of\n * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU\n * Lesser General Public License for more details.\n *\n * You should have received a
copy of the GNU Lesser General Public\n * License along with this software; if not, write
to the Free\n * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA\n *
02110-1301 USA, or see the FSF site\: http\://www.fsf.org.\n
*/\n\n\n${filecomment}\n${package_declaration}\n\n${typecomment}\n${type_declaration}</template><template
autoinsert\="false" context\="classbody_context"
deleted\="false" description\="Code in new class type bodies"
enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.classbody"
name\="classbod!
y">\n // Constants -----------------------------------------!
--------
----\n\n // Attributes ----------------------------------------------------\n\n //
Static --------------------------------------------------------\n\n // Constructors
--------------------------------------------------\n\n // Public
--------------------------------------------------------\n\n // Package protected
---------------------------------------------\n\n // Protected
-----------------------------------------------------\n\n // Private
-------------------------------------------------------\n\n // Inner classes
-------------------------------------------------\n\n</template><template
autoinsert\="true" context\="interfacebody_context"
deleted\="false" description\="Code in new interface type bodies"
enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.interfacebody"
name\="interfacebody">\n</template><template
autoinsert\="true" context\="enumbody_context"
deleted\="false" description\="Code in new enum type bodies"
enabled\="true" id\="org.eclips!
e.jdt.ui.text.codetemplates.enumbody"
name\="enumbody">\n</template><template autoinsert\="true"
context\="annotationbody_context" deleted\="false"
description\="Code in new annotation type bodies" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.annotationbody"
name\="annotationbody">\n</template><template
autoinsert\="true" context\="catchblock_context"
deleted\="false" description\="Code in new catch blocks"
enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.catchblock"
name\="catchblock">// ${todo} Auto-generated catch
block\n${exception_var}.printStackTrace();</template><template
autoinsert\="true" context\="methodbody_context"
deleted\="false" description\="Code in created method stubs"
enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.methodbody"
name\="methodbody">// ${todo} Auto-generated method
stub\n${body_statement}</template><template autoinsert\="true"
context\="constructorbody_context" deleted\="false"
description\="Code in created !
constructor stubs" enabled\="true" id\="org.eclipse.jdt.ui.tex!
t.codete
mplates.constructorbody" name\="constructorbody">${body_statement}\n//
${todo} Auto-generated constructor stub</template><template
autoinsert\="true" context\="getterbody_context"
deleted\="false" description\="Code in created getters"
enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.getterbody"
name\="getterbody">return ${field};</template><template
autoinsert\="true" context\="setterbody_context"
deleted\="false" description\="Code in created setters"
enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.setterbody"
name\="setterbody">${field} \= ${param};</template></templates>
+org.eclipse.jdt.ui.text.custom_code_templates=<?xml version\="1.0"
encoding\="UTF-8" standalone\="no"?><templates><template
autoinsert\="true" context\="gettercomment_context"
deleted\="false" description\="Comment for getter method"
enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.gettercomment"
name\="gettercomment">/**\n * @return the ${bare_field_name}\n
*/</template><template autoinsert\="true"
context\="settercomment_context" deleted\="false"
description\="Comment for setter method" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.settercomment"
name\="settercomment">/**\n * @param ${param} the ${bare_field_name} to set\n
*/</template><template autoinsert\="true"
context\="constructorcomment_context" deleted\="false"
description\="Comment for created constructors" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.constructorcomment"
name\="constructorcomment">/**\n * ${tags}\n */</template><template
autoinsert\="false" context\="filecommen!
t_context" deleted\="false" description\="Comment for created Java
files" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.filecomment"
name\="filecomment"/><template autoinsert\="false"
context\="typecomment_context" deleted\="false"
description\="Comment for created types" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.typecomment"
name\="typecomment">/**\n * A ${type_name}\n *\n * @author ${user}\n *\n *
${tags}\n *\n */</template><template autoinsert\="true"
context\="fieldcomment_context" deleted\="false"
description\="Comment for fields" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.fieldcomment"
name\="fieldcomment">/**\n * \n */</template><template
autoinsert\="true" context\="methodcomment_context"
deleted\="false" description\="Comment for non-overriding methods"
enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.methodcomment"
name\="methodcomment">/**\n * ${tags}\n */</template><template
autoinsert\="true" context\="ove!
rridecomment_context" deleted\="false" description\="Comment f!
or overr
iding methods" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.overridecomment"
name\="overridecomment">/* (non-Javadoc)\n * ${see_to_overridden}\n
*/</template><template autoinsert\="true"
context\="delegatecomment_context" deleted\="false"
description\="Comment for delegate methods" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.delegatecomment"
name\="delegatecomment">/**\n * ${tags}\n * ${see_to_target}\n
*/</template><template autoinsert\="false"
context\="newtype_context" deleted\="false" description\="Newly
created files" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.newtype"
name\="newtype">/*\n * Copyright 2009 Red Hat, Inc.\n * Red Hat licenses this
file to you under the Apache License, version\n * 2.0 (the "License"); you may
not use this file except in compliance\n * with the License. You may obtain a copy of the
License at\n * http\://www.apache.org/licenses/LICENSE-2.0\n * Unless required by
applicable law or agreed to in!
writing, software\n * distributed under the License is distributed on an "AS
IS" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or\n *
implied. See the License for the specific language governing\n * permissions and
limitations under the License.\n
*/\n\n${filecomment}\n${package_declaration}\n\n${typecomment}\n${type_declaration}</template><template
autoinsert\="false" context\="classbody_context"
deleted\="false" description\="Code in new class type bodies"
enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.classbody"
name\="classbody">\n // Constants
-----------------------------------------------------\n\n // Attributes
----------------------------------------------------\n\n // Static
--------------------------------------------------------\n\n // Constructors
--------------------------------------------------\n\n // Public
--------------------------------------------------------\n\n // Package protected
------------------!
---------------------------\n\n // Protected ---------------!
--------
------------------------------\n\n // Private
-------------------------------------------------------\n\n // Inner classes
-------------------------------------------------\n\n</template><template
autoinsert\="true" context\="interfacebody_context"
deleted\="false" description\="Code in new interface type bodies"
enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.interfacebody"
name\="interfacebody">\n</template><template
autoinsert\="true" context\="enumbody_context"
deleted\="false" description\="Code in new enum type bodies"
enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.enumbody"
name\="enumbody">\n</template><template autoinsert\="true"
context\="annotationbody_context" deleted\="false"
description\="Code in new annotation type bodies" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.annotationbody"
name\="annotationbody">\n</template><template
autoinsert\="true" context\="catchblock_context"
deleted\="false" description\="Code in new c!
atch blocks" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.catchblock"
name\="catchblock">// ${todo} Auto-generated catch
block\n${exception_var}.printStackTrace();</template><template
autoinsert\="true" context\="methodbody_context"
deleted\="false" description\="Code in created method stubs"
enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.methodbody"
name\="methodbody">// ${todo} Auto-generated method
stub\n${body_statement}</template><template autoinsert\="true"
context\="constructorbody_context" deleted\="false"
description\="Code in created constructor stubs" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.constructorbody"
name\="constructorbody">${body_statement}\n// ${todo} Auto-generated
constructor stub</template><template autoinsert\="true"
context\="getterbody_context" deleted\="false" description\="Code
in created getters" enabled\="true"
id\="org.eclipse.jdt.ui.text.codetemplates.getterbody"
name\="getterbody">return ${field};!
</template><template autoinsert\="true" context\="setterbody_c!
ontext"
deleted\="false" description\="Code in created setters"
enabled\="true" id\="org.eclipse.jdt.ui.text.codetemplates.setterbody"
name\="setterbody">${field} \= ${param};</template></templates>
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-08-21 20:41:47
UTC (rev 7878)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-08-21 21:56:02
UTC (rev 7879)
@@ -51,8 +51,6 @@
long generateUniqueID();
long getCurrentUniqueID();
-
- void setUniqueIDSequence(long id);
void storeMessage(ServerMessage message) throws Exception;
Added: trunk/src/main/org/hornetq/core/persistence/impl/journal/BatchingIDGenerator.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/BatchingIDGenerator.java
(rev 0)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/BatchingIDGenerator.java 2009-08-21
21:56:02 UTC (rev 7879)
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistence.impl.journal;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.IDGenerator;
+
+/**
+ * A BatchingIDGenerator
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ * @author <mailto:tim.fox@jboss.org">Tim Fox</a>
+ *
+ *
+ */
+public class BatchingIDGenerator implements IDGenerator
+{
+
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(BatchingIDGenerator.class);
+
+
+ public static final byte ID_COUNTER_RECORD = 24;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ private final AtomicLong counter;
+
+ private final Journal journalStorage;
+
+ private final long checkpointSize;
+
+ private volatile long nextID;
+
+ public BatchingIDGenerator(final long start, final long checkpointSize, final Journal
journalstorage)
+ {
+ counter = new AtomicLong(start);
+
+ // as soon as you generate the first ID, the nextID should be updated
+ nextID = start;
+
+ this.checkpointSize = checkpointSize;
+
+
+ this.journalStorage = journalstorage;
+ }
+
+ public void close()
+ {
+ storeID(counter.incrementAndGet(), counter.get());
+ }
+
+ public void loadState(final long journalID, final HornetQBuffer buffer)
+ {
+ IDCounterEncoding encoding = new IDCounterEncoding();
+
+ encoding.decode(buffer);
+
+ // Keep nextID and counter the same, the next generateID will update the
checkpoint
+ nextID = encoding.id;
+
+ counter.set(nextID);
+ }
+
+ public long generateID()
+ {
+ long id = counter.getAndIncrement();
+
+ if (id >= nextID)
+ {
+ saveCheckPoint(id);
+
+ return id;
+ }
+ else
+ {
+ return id;
+ }
+ }
+
+ public long getCurrentID()
+ {
+ return counter.get();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private synchronized void saveCheckPoint(final long id)
+ {
+ if (id >= nextID)
+ {
+ nextID += checkpointSize;
+ storeID(counter.incrementAndGet(), nextID);
+ }
+ }
+
+
+ private void storeID(final long journalID, final long id)
+ {
+ try
+ {
+ journalStorage.appendAddRecord(id, ID_COUNTER_RECORD, new IDCounterEncoding(id),
true);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to store id", e);
+ }
+ }
+
+
+ // Inner classes -------------------------------------------------
+
+ static class IDCounterEncoding implements EncodingSupport
+ {
+ long id;
+
+ IDCounterEncoding(final long id)
+ {
+ this.id = id;
+ }
+
+ IDCounterEncoding()
+ {
+ }
+
+ public void decode(final HornetQBuffer buffer)
+ {
+ id = buffer.readLong();
+ }
+
+ public void encode(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(id);
+ }
+
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_LONG;
+ }
+
+ }
+
+}
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-08-21
20:41:47 UTC (rev 7878)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-08-21
21:56:02 UTC (rev 7879)
@@ -92,8 +92,6 @@
public static final byte PERSISTENT_ID_RECORD = 23;
- public static final byte ID_COUNTER_RECORD = 24;
-
// type + expiration + timestamp + priority
public static final int SIZE_FIELDS = SIZE_INT + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
@@ -117,7 +115,7 @@
private UUID persistentID;
- private final BatchingIDGenerator idGenerator = new BatchingIDGenerator(0,
CHECKPOINT_BATCH_SIZE);
+ private final BatchingIDGenerator idGenerator;
private final Journal messageJournal;
@@ -216,6 +214,9 @@
{
throw new IllegalArgumentException("Unsupported journal type " +
config.getJournalType());
}
+
+
+ this.idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE,
bindingsJournal);
messageJournal = new JournalImpl(config.getJournalFileSize(),
config.getJournalMinFiles(),
@@ -262,11 +263,6 @@
return idGenerator.getCurrentID();
}
- public void setUniqueIDSequence(final long id)
- {
- idGenerator.setID(id);
- }
-
public LargeServerMessage createLargeMessage()
{
return new JournalLargeServerMessage(this);
@@ -969,21 +965,15 @@
persistentID = encoding.uuid;
}
- else if (rec == ID_COUNTER_RECORD)
+ else if (rec == BatchingIDGenerator.ID_COUNTER_RECORD)
{
- IDCounterEncoding encoding = new IDCounterEncoding();
-
- encoding.decode(buffer);
-
- lastID = encoding.id;
+ idGenerator.loadState(record.id, buffer);
}
else
{
throw new IllegalStateException("Invalid record type " + rec);
}
}
-
- idGenerator.setID(lastID + 1);
}
// HornetQComponent implementation
@@ -1128,79 +1118,6 @@
// Inner Classes
// ----------------------------------------------------------------------------
- private class BatchingIDGenerator implements IDGenerator
- {
- private final AtomicLong counter;
-
- private final long checkpointSize;
-
- private volatile long nextID;
-
- public BatchingIDGenerator(final long start, final long checkpointSize)
- {
- this.counter = new AtomicLong(start);
-
- this.checkpointSize = checkpointSize;
-
- nextID = start + checkpointSize;
- }
-
- public void setID(final long id)
- {
- this.counter.set(id);
-
- nextID = id + checkpointSize;
- }
-
- public long generateID()
- {
- long id = counter.getAndIncrement();
-
- if (id >= nextID)
- {
- saveCheckPoint(id);
-
- return id;
- }
- else
- {
- return id;
- }
- }
-
- private synchronized void saveCheckPoint(final long id)
- {
- if (id >= nextID)
- {
- storeID(id);
-
- nextID += checkpointSize;
- }
- }
-
- public long getCurrentID()
- {
- return counter.get();
- }
-
- public void close()
- {
- storeID(counter.get());
- }
-
- private void storeID(final long id)
- {
- try
- {
- bindingsJournal.appendAddRecord(id, ID_COUNTER_RECORD, new
IDCounterEncoding(id), true);
- }
- catch (Exception e)
- {
- log.error("Failed to store id", e);
- }
- }
- }
-
private static class XidEncoding implements EncodingSupport
{
final Xid xid;
@@ -1334,36 +1251,6 @@
}
- private static class IDCounterEncoding implements EncodingSupport
- {
- long id;
-
- IDCounterEncoding(final long id)
- {
- this.id = id;
- }
-
- IDCounterEncoding()
- {
- }
-
- public void decode(final HornetQBuffer buffer)
- {
- id = buffer.readLong();
- }
-
- public void encode(final HornetQBuffer buffer)
- {
- buffer.writeLong(id);
- }
-
- public int getEncodeSize()
- {
- return SIZE_LONG;
- }
-
- }
-
private static class LargeMessageEncoding implements EncodingSupport
{
private final LargeServerMessage message;
Added: trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java
(rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java 2009-08-21
21:56:02 UTC (rev 7879)
@@ -0,0 +1,295 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.util.ArrayList;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.SpawnedVMSupport;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A JournalCrashTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalCrashTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static final int FIRST_RUN = 4;
+
+ private static final int SECOND_RUN = 8;
+
+ private static final int THIRD_RUN = 100;
+
+ private static final int FOURTH_RUN = 400;
+
+ private HornetQServer server;
+
+ private ClientSessionFactory factory;
+
+ private SimpleString QUEUE = new SimpleString("queue");
+
+ protected void tearDown() throws Exception
+ {
+ stopServer();
+
+ printJournal();
+
+ super.tearDown();
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void startServer() throws Exception
+ {
+ Configuration config = createDefaultConfig();
+ config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
+
config.setJournalCompactMinFiles(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_MIN_FILES);
+
config.setJournalCompactPercentage(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE);
+ config.setJournalMinFiles(2);
+
+ server = super.createServer(true, config);
+
+ server.start();
+
+ factory = createInVMFactory();
+ }
+
+ protected void stopServer() throws Exception
+ {
+
+ try
+ {
+ factory.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ factory = null;
+
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ server = null;
+ }
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ /**
+ * The test needs another VM, that will be "killed" right after commit. This
main will do this job.
+ */
+ public static void main(String arg[])
+ {
+ try
+ {
+ int start = 4;
+ int end = 8;
+
+ if (arg.length > 0)
+ {
+ start = Integer.parseInt(arg[0]);
+ }
+
+ if (arg.length > 1)
+ {
+ end = Integer.parseInt(arg[1]);
+ }
+
+ JournalCrashTest restart = new JournalCrashTest();
+
+ restart.startServer();
+
+ restart.sendMessages(start, end);
+
+ System.out.println("....end");
+ System.out.flush();
+
+ Runtime.getRuntime().halt(100);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+
+ public void sendMessages(int start, int end) throws Exception
+ {
+ ClientSession session = null;
+ try
+ {
+
+ session = factory.createSession(false, false);
+
+ try
+ {
+ session.createQueue(QUEUE, QUEUE, true);
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ ClientProducer prod = session.createProducer(QUEUE);
+
+ for (int i = start; i < end; i++)
+ {
+ ClientMessage msg = session.createClientMessage(true);
+ msg.getProperties().putIntProperty(new SimpleString("key"), i);
+ msg.getBody().writeUTF("message " + i);
+ prod.send(msg);
+ }
+
+ session.commit();
+ session.close();
+ // server.stop(); -- this test was not supposed to stop the server, it should
crash
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+
+ public void testRestartJournal() throws Throwable
+ {
+ runExternalProcess(0, FIRST_RUN);
+ runExternalProcess(FIRST_RUN, SECOND_RUN);
+ runExternalProcess(SECOND_RUN, THIRD_RUN);
+ runExternalProcess(THIRD_RUN, FOURTH_RUN);
+
+ printJournal();
+
+ ClientSession session = null;
+ try
+ {
+ startServer();
+
+ session = factory.createSession(true, true);
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+ session.start();
+
+ for (int i = 0; i < FOURTH_RUN; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+
+ assertNotNull("Msg at " + i, msg);
+
+ msg.acknowledge();
+
+ assertEquals(i, msg.getProperty(new SimpleString("key")));
+ }
+ session.close();
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ /**
+ * @throws Exception
+ * @throws InterruptedException
+ */
+ private void runExternalProcess(int start, int end) throws Exception,
InterruptedException
+ {
+ System.err.println("running external process...");
+ Process process = SpawnedVMSupport.spawnVM(this.getClass().getCanonicalName(),
+ "-Xms128m -Xmx128m ",
+ new String[] {},
+ true,
+ true,
+ Integer.toString(start),
+ Integer.toString(end));
+
+ assertEquals(100, process.waitFor());
+ }
+
+ /**
+ * @throws Exception
+ */
+ private void printJournal() throws Exception
+ {
+ AIOSequentialFileFactory factory = new AIOSequentialFileFactory(getJournalDir());
+ JournalImpl journal = new JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE,
+ 2,
+ 0,
+ 0,
+ factory,
+ "hornetq-data",
+ "hq",
+ 100);
+
+ ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
+ ArrayList<PreparedTransactionInfo> transactions = new
ArrayList<PreparedTransactionInfo>();
+
+ journal.start();
+ journal.load(records, transactions);
+
+ System.out.println("===============================================");
+ System.out.println("Journal records at the end:");
+
+ for (RecordInfo record : records)
+ {
+ System.out.println(record.id + ", update = " + record.isUpdate);
+ }
+ journal.stop();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java 2009-08-21
20:41:47 UTC (rev 7878)
+++
trunk/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java 2009-08-21
21:56:02 UTC (rev 7879)
@@ -238,12 +238,8 @@
*/
private void setUp(final boolean persistDeliveryCountBeforeDelivery) throws Exception,
HornetQException
{
- Configuration config = createConfigForJournal();
- config.setJournalFileSize(10 * 1024);
- config.setJournalMinFiles(2);
- config.setSecurityEnabled(false);
+ Configuration config = createDefaultConfig();
config.setPersistDeliveryCountBeforeDelivery(persistDeliveryCountBeforeDelivery);
- config.getAcceptorConfigurations().add(new
TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName()));
server = createServer(true, config);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java 2009-08-21
20:41:47 UTC (rev 7878)
+++
trunk/tests/src/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java 2009-08-21
21:56:02 UTC (rev 7879)
@@ -24,24 +24,27 @@
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.SimpleString;
+/**
+ *
+ * A SessionCloseOnGCTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ * @author <mailto:tim.fox@jboss.org">Tim Fox</a>
+ *
+ *
+ */
public class SessionCloseOnGCTest extends ServiceTestBase
{
private static final Logger log = Logger.getLogger(SessionCloseOnGCTest.class);
private HornetQServer server;
- private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
-
@Override
protected void setUp() throws Exception
{
super.setUp();
server = createServer(false);
- server.getConfiguration()
- .getAcceptorConfigurations()
- .add(new
TransportConfiguration(NettyAcceptorFactory.class.getCanonicalName()));
-
server.start();
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java 2009-08-21
20:41:47 UTC (rev 7878)
+++
trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java 2009-08-21
21:56:02 UTC (rev 7879)
@@ -47,7 +47,8 @@
public void testLargeMessageCopy() throws Exception
{
clearData();
- Configuration configuration = createConfigForJournal();
+
+ Configuration configuration = createDefaultConfig();
configuration.start();
Modified: trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2009-08-21
20:41:47 UTC (rev 7878)
+++
trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2009-08-21
21:56:02 UTC (rev 7879)
@@ -56,7 +56,7 @@
File testdir = new File(getTestDir());
deleteDirectory(testdir);
- Configuration configuration = createConfigForJournal();
+ Configuration configuration = createDefaultConfig();
configuration.start();
Modified: trunk/tests/src/org/hornetq/tests/integration/server/LVQRecoveryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/server/LVQRecoveryTest.java 2009-08-21
20:41:47 UTC (rev 7878)
+++ trunk/tests/src/org/hornetq/tests/integration/server/LVQRecoveryTest.java 2009-08-21
21:56:02 UTC (rev 7879)
@@ -196,10 +196,8 @@
super.setUp();
clearData();
- configuration = createConfigForJournal();
+ configuration = createDefaultConfig();
configuration.setSecurityEnabled(false);
- TransportConfiguration transportConfig = new
TransportConfiguration(INVM_ACCEPTOR_FACTORY);
- configuration.getAcceptorConfigurations().add(transportConfig);
server = createServer(true, configuration);
// start the server
server.start();
Added:
trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java 2009-08-21
21:56:02 UTC (rev 7879)
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.core.persistence.impl;
+
+import java.io.File;
+import java.util.ArrayList;
+
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.persistence.impl.journal.BatchingIDGenerator;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * A BatchIDGeneratorUnitTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class BatchIDGeneratorUnitTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testSequence() throws Exception
+ {
+ NIOSequentialFileFactory factory = new NIOSequentialFileFactory(getTestDir());
+ Journal journal = new JournalImpl(10 * 1024, 2, 0, 0, factory,
"test-data", "tst", 1);
+
+ journal.start();
+
+ journal.load(new ArrayList<RecordInfo>(), new
ArrayList<PreparedTransactionInfo>());
+
+ BatchingIDGenerator batch = new BatchingIDGenerator(0, 1000, journal);
+ long id1 = batch.generateID();
+ long id2 = batch.generateID();
+
+ assertTrue(id2 > id1);
+
+ journal.stop();
+ batch = new BatchingIDGenerator(0, 1000, journal);
+ loadIDs(journal, batch);
+
+ long id3 = batch.generateID();
+
+ assertEquals(1000, id3);
+
+ long id4 = batch.generateID();
+
+ assertTrue(id4 > id3 && id4 < 2000);
+
+ batch.close();
+
+ journal.stop();
+ batch = new BatchingIDGenerator(0, 1000, journal);
+ loadIDs(journal, batch);
+
+ long id5 = batch.generateID();
+ assertTrue(id5 > id4 && id5 < 2000);
+
+
+ long lastId = id5;
+
+ boolean close = true;
+ for (int i = 0 ; i < 100000; i++)
+ {
+ if (i % 1000 == 0)
+ {
+ System.out.println("lastId = " + lastId);
+ // interchanging closes and simulated crashes
+ if (close)
+ {
+ batch.close();
+ }
+
+ close = !close;
+
+ journal.stop();
+ batch = new BatchingIDGenerator(0, 1000, journal);
+ loadIDs(journal, batch);
+ }
+
+ long id = batch.generateID();
+
+ assertTrue(id > lastId);
+
+ lastId = id;
+ }
+
+ System.out.println("LastID = " + lastId);
+
+ }
+
+ protected void loadIDs(Journal journal, BatchingIDGenerator batch) throws Exception
+ {
+ ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
+ ArrayList<PreparedTransactionInfo> tx = new
ArrayList<PreparedTransactionInfo>();
+
+ journal.start();
+ journal.load(records, tx);
+
+ assertEquals(0, tx.size());
+
+ assertTrue(records.size() > 0);
+
+ for (RecordInfo record : records)
+ {
+ if (record.userRecordType == BatchingIDGenerator.ID_COUNTER_RECORD)
+ {
+ HornetQBuffer buffer = ChannelBuffers.wrappedBuffer(record.data);
+ batch.loadState(record.id, buffer);
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-08-21
20:41:47 UTC (rev 7878)
+++
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-08-21
21:56:02 UTC (rev 7879)
@@ -75,7 +75,7 @@
SimpleString ADDRESS = new SimpleString("address");
- Configuration configuration = createConfigForJournal();
+ Configuration configuration = createDefaultConfig();
configuration.start();
Modified: trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-08-21 20:41:47 UTC
(rev 7878)
+++ trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-08-21 21:56:02 UTC
(rev 7879)
@@ -121,17 +121,6 @@
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
- protected Configuration createConfigForJournal()
- {
- Configuration config = new ConfigurationImpl();
- config.setJournalDirectory(getJournalDir());
- config.setBindingsDirectory(getBindingsDir());
- config.setJournalType(JournalType.ASYNCIO);
- config.setLargeMessagesDirectory(getLargeMessagesDir());
- config.setJournalCompactMinFiles(0);
- config.setJournalCompactPercentage(0);
- return config;
- }
protected HornetQServer createServer(final boolean realFiles,
final Configuration configuration,
Modified: trunk/tests/src/org/hornetq/tests/util/SpawnedVMSupport.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/SpawnedVMSupport.java 2009-08-21 20:41:47 UTC
(rev 7878)
+++ trunk/tests/src/org/hornetq/tests/util/SpawnedVMSupport.java 2009-08-21 21:56:02 UTC
(rev 7879)
@@ -68,13 +68,14 @@
final boolean logOutput,
final String... args) throws Exception
{
- return spawnVM(className, "-Xms512m -Xmx512m ", vmargs, logOutput,
args);
+ return spawnVM(className, "-Xms512m -Xmx512m ", vmargs, logOutput, false,
args);
}
public static Process spawnVM(final String className,
final String memoryArgs,
final String[] vmargs,
final boolean logOutput,
+ final boolean logErrorOutput,
final String... args) throws Exception
{
StringBuffer sb = new StringBuffer();
@@ -137,7 +138,7 @@
// Adding a reader to System.err, so the VM won't hang on a System.err.println
as identified on this forum thread:
//
http://www.jboss.org/index.html?module=bb&op=viewtopic&t=151815
- ProcessLogger errorLogger = new ProcessLogger(false, process.getErrorStream(),
className);
+ ProcessLogger errorLogger = new ProcessLogger(logErrorOutput,
process.getErrorStream(), className);
errorLogger.start();
return process;
Modified: trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-08-21 20:41:47 UTC (rev
7878)
+++ trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-08-21 21:56:02 UTC (rev
7879)
@@ -549,6 +549,8 @@
{
super.setUp();
+ deleteDirectory(new File(getTestDir()));
+
InVMRegistry.instance.clear();
// checkFreePort(TransportConstants.DEFAULT_PORT);