JBoss hornetq SVN: r8442 - trunk.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-11-28 03:48:09 -0500 (Sat, 28 Nov 2009)
New Revision: 8442
Modified:
trunk/.classpath
Log:
fixed eclipse's .classpath
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2009-11-28 08:42:53 UTC (rev 8441)
+++ trunk/.classpath 2009-11-28 08:48:09 UTC (rev 8442)
@@ -100,7 +100,7 @@
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="tests/tmpfiles"/>
<classpathentry kind="lib" path="thirdparty/net/java/dev/javacc/lib/javacc.jar"/>
- <classpathentry kind="lib" path="thirdparty/org/jboss/netty/lib/netty.jar" sourcepath="/home/tim/workspace/netty-3.1.5.GA/src/main"/>
+ <classpathentry kind="lib" path="thirdparty/org/jboss/netty/lib/netty.jar"/>
<classpathentry kind="lib" path="thirdparty/log4j/lib/log4j.jar"/>
<classpathentry kind="lib" path="thirdparty/org/jboss/naming/lib/jnpserver.jar"/>
<classpathentry kind="lib" path="thirdparty/org/jboss/security/lib/jbosssx.jar"/>
15 years
JBoss hornetq SVN: r8441 - in trunk: src/main/org/hornetq/core/journal/impl and 2 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-28 03:42:53 -0500 (Sat, 28 Nov 2009)
New Revision: 8441
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
Log:
mainly fixed JMSFailoverTest
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-28 03:29:29 UTC (rev 8440)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-28 08:42:53 UTC (rev 8441)
@@ -144,7 +144,7 @@
sessionExecutor = executor;
this.clientWindowSize = clientWindowSize;
-
+
this.ackBatchSize = ackBatchSize;
}
@@ -585,7 +585,7 @@
if (clientWindowSize >= 0)
{
creditsToSend += messageBytes;
-
+
if (creditsToSend >= clientWindowSize)
{
if (clientWindowSize == 0 && discountSlowConsumer)
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2009-11-28 03:29:29 UTC (rev 8440)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2009-11-28 08:42:53 UTC (rev 8441)
@@ -82,13 +82,13 @@
}
public synchronized void reset()
- {
+ {
// Any arriving credits from before failover won't arrive, so we re-initialise
semaphore.drainPermits();
arriving = 0;
-
+
checkCredits(windowSize * 2);
}
@@ -123,6 +123,7 @@
private void requestCredits(final int credits)
{
+
session.sendProducerCreditsMessage(credits, destination);
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-28 03:29:29 UTC (rev 8440)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-28 08:42:53 UTC (rev 8441)
@@ -843,13 +843,13 @@
conn.write(buffer, false);
- int clientWindowSize = calcWindowSize(entry.getValue().getClientWindowSize());
-
+ int clientWindowSize = entry.getValue().getClientWindowSize();
+
if (clientWindowSize != 0)
{
SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
clientWindowSize);
-
+
packet.setChannelID(channel.getID());
buffer = packet.encode(channel.getConnection());
@@ -902,6 +902,9 @@
if (resetCreditManager)
{
producerCreditManager.reset();
+
+ //Also need to send more credits for consumers, otherwise the system could hand with the server
+ //not having any credits to send
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-28 03:29:29 UTC (rev 8440)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-28 08:42:53 UTC (rev 8441)
@@ -99,7 +99,7 @@
// Public --------------------------------------------------------
public TimedBuffer(final int size, final long timeout, final boolean flushOnSync, final boolean logRates)
- {
+ {
this.bufferSize = size;
this.logRates = logRates;
if (logRates)
@@ -131,7 +131,6 @@
timerThread.start();
- log.info("log rates " + logRates);
if (logRates)
{
logRatesTimerTask = new LogRatesTimerTask();
@@ -263,7 +262,7 @@
flush();
}
}
-
+
if (buffer.writerIndex() == bufferLimit)
{
flush();
@@ -273,47 +272,47 @@
public void flush()
{
ByteBuffer bufferToFlush = null;
-
+
boolean useSync = false;
-
+
List<IOAsyncTask> callbacksToCall = null;
-
+
synchronized (this)
{
if (buffer.writerIndex() > 0)
{
latchTimer.up();
-
+
int pos = buffer.writerIndex();
-
+
if (logRates)
{
bytesFlushed += pos;
}
-
+
bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
-
+
// Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
// Using bufferToFlush.put(buffer) would make several append calls for each byte
-
+
bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
callbacksToCall = callbacks;
-
+
callbacks = new LinkedList<IOAsyncTask>();
-
+
useSync = pendingSync;
-
+
active = false;
pendingSync = false;
-
+
buffer.clear();
bufferLimit = 0;
flushesDone++;
}
}
-
+
// Execute the flush outside of the lock
// This is important for NIO performance while we are using NIO Callbacks
if (bufferToFlush != null)
@@ -339,7 +338,7 @@
{
if (bufferObserver != null)
{
- flush();
+ flush();
}
}
finally
@@ -369,7 +368,7 @@
{
double rate = 1000 * ((double)bytesFlushed) / (now - lastExecution);
log.info("Write rate = " + rate + " bytes / sec or " + (long)(rate / (1024 * 1024)) + " MiB / sec");
- double flushRate = 1000 * ((double)flushesDone) / (now - lastExecution);
+ double flushRate = 1000 * ((double)flushesDone) / (now - lastExecution);
log.info("Flush rate = " + flushRate + " flushes / sec");
}
@@ -377,7 +376,7 @@
bytesFlushed = 0;
- flushesDone = 0;
+ flushesDone = 0;
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-28 03:29:29 UTC (rev 8440)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-28 08:42:53 UTC (rev 8441)
@@ -186,7 +186,7 @@
public HandleStatus handle(final MessageReference ref) throws Exception
{
if (availableCredits != null && availableCredits.get() <= 0)
- {
+ {
return HandleStatus.BUSY;
}
@@ -416,6 +416,8 @@
public void receiveCredits(final int credits) throws Exception
{
+
+
if (credits == -1)
{
// No flow control
@@ -433,7 +435,7 @@
" currentValue = " +
availableCredits.get());
}
-
+
if (previous <= 0 && previous + credits > 0)
{
promptDelivery(true);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2009-11-28 03:29:29 UTC (rev 8440)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2009-11-28 08:42:53 UTC (rev 8441)
@@ -85,7 +85,18 @@
jbcf.setBlockOnPersistentSend(true);
jbcf.setBlockOnNonPersistentSend(true);
+
+ //Note we set consumer window size to a value so we can verify that consumer credit re-sending
+ //works properly on failover
+ //The value is small enough that credits will have to be resent several time
+
+ final int numMessages = 10;
+
+ final int bodySize = 1000;
+
+ jbcf.setConsumerWindowSize(numMessages * bodySize / 10);
+
Connection conn = jbcf.createConnection();
MyExceptionListener listener = new MyExceptionListener();
@@ -104,8 +115,7 @@
Queue queue = sess.createQueue("myqueue");
- final int numMessages = 1000;
-
+
MessageProducer producer = sess.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
15 years
JBoss hornetq SVN: r8440 - trunk/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-27 22:29:29 -0500 (Fri, 27 Nov 2009)
New Revision: 8440
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
Log:
Tweak... There's always an executor on the context now.. Removing one extra check that is not necessary
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-28 03:15:46 UTC (rev 8439)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-28 03:29:29 UTC (rev 8440)
@@ -176,23 +176,14 @@
TaskHolder holder = iter.next();
if (stored >= holder.storeLined && replicated >= holder.replicationLined)
{
- if (executor != null)
- {
- // If set, we use an executor to avoid the server being single threaded
- execute(holder.task);
- }
- else
- {
- holder.task.done();
- }
+ // If set, we use an executor to avoid the server being single threaded
+ execute(holder.task);
iter.remove();
}
else
{
- // The actions need to be done in order...
- // And they are added in order...
- // As soon as we're done, we break the loop
+ // End of list here. No other task will be completed after this
break;
}
}
15 years
JBoss hornetq SVN: r8439 - trunk/tests/src/org/hornetq/tests/integration/cluster/bridge.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-27 22:15:46 -0500 (Fri, 27 Nov 2009)
New Revision: 8439
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
Log:
oops.. just a tweak again..
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2009-11-28 03:15:10 UTC (rev 8438)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2009-11-28 03:15:46 UTC (rev 8439)
@@ -63,7 +63,7 @@
}
}
- servers.clear();
+ servers = null;
super.tearDown();
}
15 years
JBoss hornetq SVN: r8438 - trunk/tests/src/org/hornetq/tests/integration/cluster/bridge.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-27 22:15:10 -0500 (Fri, 27 Nov 2009)
New Revision: 8438
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
Log:
just a tweak
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2009-11-28 03:03:48 UTC (rev 8437)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2009-11-28 03:15:10 UTC (rev 8438)
@@ -63,6 +63,8 @@
}
}
+ servers.clear();
+
super.tearDown();
}
15 years
JBoss hornetq SVN: r8437 - trunk/tests/src/org/hornetq/tests/integration/cluster/bridge.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-27 22:03:48 -0500 (Fri, 27 Nov 2009)
New Revision: 8437
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
Log:
Server should stop servers in case of failure so other tests will proceed
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-11-28 02:43:42 UTC (rev 8436)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-11-28 03:03:48 UTC (rev 8437)
@@ -17,8 +17,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
@@ -31,7 +29,6 @@
import org.hornetq.core.config.cluster.QueueConfiguration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
@@ -74,7 +71,7 @@
return InVMConnectorFactory.class.getName();
}
}
-
+
// Fail bridge and reconnecting immediately
public void testFailoverAndReconnectImmediately() throws Exception
{
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2009-11-28 02:43:42 UTC (rev 8436)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2009-11-28 03:03:48 UTC (rev 8437)
@@ -13,6 +13,7 @@
package org.hornetq.tests.integration.cluster.bridge;
+import java.util.ArrayList;
import java.util.Map;
import org.hornetq.core.config.Configuration;
@@ -34,10 +35,43 @@
*/
public abstract class BridgeTestBase extends UnitTestCase
{
+
+ private ArrayList<HornetQServer> servers;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ servers = new ArrayList<HornetQServer>();
+ }
+
+ public void tearDown() throws Exception
+ {
+ for (HornetQServer server: servers)
+ {
+ try
+ {
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
+ catch (Throwable e)
+ {
+ // System.out -> junit report
+ System.out.println("Error while stopping server:");
+ e.printStackTrace(System.out);
+ }
+ }
+
+ super.tearDown();
+ }
+
protected HornetQServer createHornetQServer(final int id, final boolean netty, final Map<String, Object> params)
{
return createHornetQServer(id, params, netty, false);
}
+
+
protected HornetQServer createHornetQServer(final int id,
final Map<String, Object> params,
@@ -71,6 +105,9 @@
.add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory", params));
}
HornetQServer service = HornetQ.newHornetQServer(serviceConf, true);
+
+ servers.add(service);
+
return service;
}
15 years
JBoss hornetq SVN: r8436 - in trunk/src/main/org/hornetq/core: journal/impl/dataformat and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-27 21:43:42 -0500 (Fri, 27 Nov 2009)
New Revision: 8436
Added:
trunk/src/main/org/hornetq/core/journal/impl/dataformat/ByteArrayEncoding.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java
Removed:
trunk/src/main/org/hornetq/core/journal/impl/InternalEncoder.java
Modified:
trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
Log:
tweaks
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2009-11-28 02:30:52 UTC (rev 8435)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2009-11-28 02:43:42 UTC (rev 8436)
@@ -22,7 +22,9 @@
import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.Pair;
@@ -153,11 +155,11 @@
}
- InternalEncoder controlRecord = new JournalAddRecord(true,
- 1,
- (byte)0,
- new JournalImpl.ByteArrayEncoding(filesToRename.toByteBuffer()
- .array()));
+ JournalInternalRecord controlRecord = new JournalAddRecord(true,
+ 1,
+ (byte)0,
+ new ByteArrayEncoding(filesToRename.toByteBuffer()
+ .array()));
controlRecord.setFileID(-1);
@@ -235,13 +237,13 @@
return writingChannel;
}
- protected void writeEncoder(InternalEncoder record) throws Exception
+ protected void writeEncoder(JournalInternalRecord record) throws Exception
{
record.setFileID(fileID);
record.encode(getWritingChannel());
}
- protected void writeEncoder(InternalEncoder record, int txcounter) throws Exception
+ protected void writeEncoder(JournalInternalRecord record, int txcounter) throws Exception
{
record.setNumberOfRecords(txcounter);
writeEncoder(record);
Deleted: trunk/src/main/org/hornetq/core/journal/impl/InternalEncoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/InternalEncoder.java 2009-11-28 02:30:52 UTC (rev 8435)
+++ trunk/src/main/org/hornetq/core/journal/impl/InternalEncoder.java 2009-11-28 02:43:42 UTC (rev 8436)
@@ -1,56 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.journal.impl;
-
-import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.journal.EncodingSupport;
-
-/**
- * A InternalEncoder
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public abstract class InternalEncoder implements EncodingSupport
-{
-
- protected int fileID;
-
- public int getFileID()
- {
- return fileID;
- }
-
- public void setFileID(int fileID)
- {
- this.fileID = fileID;
- }
-
- public void decode(HornetQBuffer buffer)
- {
- }
-
- public void setNumberOfRecords(int records)
- {
- }
-
- public int getNumberOfRecords()
- {
- return 0;
- }
-
- public abstract int getEncodeSize();
-}
-
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2009-11-28 02:30:52 UTC (rev 8435)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2009-11-28 02:43:42 UTC (rev 8436)
@@ -20,6 +20,7 @@
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
@@ -80,7 +81,7 @@
writeEncoder(new JournalAddRecord(true,
info.id,
info.getUserRecordType(),
- new JournalImpl.ByteArrayEncoding(info.data)));
+ new ByteArrayEncoding(info.data)));
}
}
@@ -97,7 +98,7 @@
transactionID,
recordInfo.id,
recordInfo.getUserRecordType(),
- new JournalImpl.ByteArrayEncoding(recordInfo.data)));
+ new ByteArrayEncoding(recordInfo.data)));
}
}
@@ -128,7 +129,7 @@
writeEncoder(new JournalDeleteRecordTX(transactionID,
recordInfo.id,
- new JournalImpl.ByteArrayEncoding(recordInfo.data)));
+ new ByteArrayEncoding(recordInfo.data)));
}
/* (non-Javadoc)
@@ -138,7 +139,7 @@
{
int txcounter = getTransactionCounter(transactionID);
- writeEncoder(new JournalCompleteRecordTX(false, transactionID, new JournalImpl.ByteArrayEncoding(extraData)),
+ writeEncoder(new JournalCompleteRecordTX(false, transactionID, new ByteArrayEncoding(extraData)),
txcounter);
}
@@ -160,7 +161,7 @@
writeEncoder(new JournalAddRecord(false,
recordInfo.id,
recordInfo.userRecordType,
- new JournalImpl.ByteArrayEncoding(recordInfo.data)));
+ new ByteArrayEncoding(recordInfo.data)));
}
}
@@ -177,7 +178,7 @@
transactionID,
recordInfo.id,
recordInfo.userRecordType,
- new JournalImpl.ByteArrayEncoding(recordInfo.data)));
+ new ByteArrayEncoding(recordInfo.data)));
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2009-11-28 02:30:52 UTC (rev 8435)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2009-11-28 02:43:42 UTC (rev 8436)
@@ -27,10 +27,12 @@
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl.JournalRecord;
+import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.Pair;
@@ -250,10 +252,10 @@
{
if (lookupRecord(info.id))
{
- InternalEncoder addRecord = new JournalAddRecord(true,
+ JournalInternalRecord addRecord = new JournalAddRecord(true,
info.id,
info.getUserRecordType(),
- new JournalImpl.ByteArrayEncoding(info.data));
+ new ByteArrayEncoding(info.data));
checkSize(addRecord.getEncodeSize());
@@ -269,11 +271,11 @@
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
- InternalEncoder record = new JournalAddRecordTX(true,
+ JournalInternalRecord record = new JournalAddRecordTX(true,
transactionID,
info.id,
info.getUserRecordType(),
- new JournalImpl.ByteArrayEncoding(info.data));
+ new ByteArrayEncoding(info.data));
checkSize(record.getEncodeSize());
@@ -315,9 +317,9 @@
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
- InternalEncoder record = new JournalDeleteRecordTX(transactionID,
+ JournalInternalRecord record = new JournalDeleteRecordTX(transactionID,
info.id,
- new JournalImpl.ByteArrayEncoding(info.data));
+ new ByteArrayEncoding(info.data));
checkSize(record.getEncodeSize());
@@ -340,9 +342,9 @@
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
- InternalEncoder prepareRecord = new JournalCompleteRecordTX(false,
+ JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(false,
transactionID,
- new JournalImpl.ByteArrayEncoding(extraData));
+ new ByteArrayEncoding(extraData));
checkSize(prepareRecord.getEncodeSize());
@@ -367,10 +369,10 @@
{
if (lookupRecord(info.id))
{
- InternalEncoder updateRecord = new JournalAddRecord(false,
+ JournalInternalRecord updateRecord = new JournalAddRecord(false,
info.id,
info.userRecordType,
- new JournalImpl.ByteArrayEncoding(info.data));
+ new ByteArrayEncoding(info.data));
checkSize(updateRecord.getEncodeSize());
@@ -395,11 +397,11 @@
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
- InternalEncoder updateRecordTX = new JournalAddRecordTX(false,
+ JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false,
transactionID,
info.id,
info.userRecordType,
- new JournalImpl.ByteArrayEncoding(info.data));
+ new ByteArrayEncoding(info.data));
checkSize(updateRecordTX.getEncodeSize());
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-28 02:30:52 UTC (rev 8435)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-28 02:43:42 UTC (rev 8436)
@@ -54,11 +54,13 @@
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.TestableJournal;
import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
import org.hornetq.core.journal.impl.dataformat.JournalRollbackRecordTX;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.DataConstants;
@@ -672,7 +674,7 @@
try
{
- InternalEncoder addRecord = new JournalAddRecord(true, id, recordType, record);
+ JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
if (callback != null)
{
@@ -744,7 +746,7 @@
}
}
- InternalEncoder updateRecord = new JournalAddRecord(false, id, recordType, record);
+ JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
if (callback != null)
{
@@ -817,7 +819,7 @@
}
}
- InternalEncoder deleteRecord = new JournalDeleteRecord(id);
+ JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
if (callback != null)
{
@@ -877,7 +879,7 @@
try
{
- InternalEncoder addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
+ JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
JournalTransaction tx = getTransactionInfo(txID);
@@ -926,7 +928,7 @@
try
{
- InternalEncoder updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
+ JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
JournalTransaction tx = getTransactionInfo(txID);
@@ -969,7 +971,7 @@
try
{
- InternalEncoder deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
+ JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
JournalTransaction tx = getTransactionInfo(txID);
@@ -1054,7 +1056,7 @@
try
{
- InternalEncoder prepareRecord = new JournalCompleteRecordTX(false, txID, transactionData);
+ JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(false, txID, transactionData);
if (callback != null)
{
@@ -1134,7 +1136,7 @@
throw new IllegalStateException("Cannot find tx with id " + txID);
}
- InternalEncoder commitRecord = new JournalCompleteRecordTX(true, txID, null);
+ JournalInternalRecord commitRecord = new JournalCompleteRecordTX(true, txID, null);
if (callback != null)
{
@@ -1194,7 +1196,7 @@
throw new IllegalStateException("Cannot find tx with id " + txID);
}
- InternalEncoder rollbackRecord = new JournalRollbackRecordTX(txID);
+ JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
if (callback != null)
{
@@ -2697,7 +2699,7 @@
*
* @param completeTransaction If the appendRecord is for a prepare or commit, where we should update the number of pendingTransactions on the current file
* */
- private JournalFile appendRecord(final InternalEncoder encoder,
+ private JournalFile appendRecord(final JournalInternalRecord encoder,
final boolean completeTransaction,
final boolean sync,
final JournalTransaction tx,
@@ -3318,34 +3320,6 @@
}
- public static class ByteArrayEncoding implements EncodingSupport
- {
-
- final byte[] data;
-
- public ByteArrayEncoding(final byte[] data)
- {
- this.data = data;
- }
-
- // Public --------------------------------------------------------
-
- public void decode(final HornetQBuffer buffer)
- {
- throw new IllegalStateException("operation not supported");
- }
-
- public void encode(final HornetQBuffer buffer)
- {
- buffer.writeBytes(data);
- }
-
- public int getEncodeSize()
- {
- return data.length;
- }
- }
-
// Used on Load
private static class TransactionHolder
{
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2009-11-28 02:30:52 UTC (rev 8435)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2009-11-28 02:43:42 UTC (rev 8436)
@@ -23,6 +23,7 @@
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
import org.hornetq.utils.DataConstants;
/**
@@ -184,7 +185,7 @@
* @param currentFile
* @param bb
*/
- public void fillNumberOfRecords(final JournalFile currentFile, final InternalEncoder data)
+ public void fillNumberOfRecords(final JournalFile currentFile, final JournalInternalRecord data)
{
data.setNumberOfRecords(getCounter(currentFile));
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-28 02:30:52 UTC (rev 8435)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-28 02:43:42 UTC (rev 8436)
@@ -27,6 +27,7 @@
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.VariableLatch;
@@ -233,7 +234,7 @@
public synchronized void addBytes(final HornetQBuffer bytes, final boolean sync, final IOAsyncTask callback)
{
- addBytes(new JournalImpl.ByteArrayEncoding(bytes.toByteBuffer().array()), sync, callback);
+ addBytes(new ByteArrayEncoding(bytes.toByteBuffer().array()), sync, callback);
}
public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback)
Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/ByteArrayEncoding.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/ByteArrayEncoding.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/ByteArrayEncoding.java 2009-11-28 02:43:42 UTC (rev 8436)
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+
+/**
+ * A ByteArrayEncoding
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ByteArrayEncoding implements EncodingSupport
+{
+
+ final byte[] data;
+
+ public ByteArrayEncoding(final byte[] data)
+ {
+ this.data = data;
+ }
+
+ // Public --------------------------------------------------------
+
+ public void decode(final HornetQBuffer buffer)
+ {
+ throw new IllegalStateException("operation not supported");
+ }
+
+ public void encode(final HornetQBuffer buffer)
+ {
+ buffer.writeBytes(data);
+ }
+
+ public int getEncodeSize()
+ {
+ return data.length;
+ }
+}
+
Modified: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java 2009-11-28 02:30:52 UTC (rev 8435)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java 2009-11-28 02:43:42 UTC (rev 8436)
@@ -15,7 +15,6 @@
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.core.journal.impl.InternalEncoder;
import org.hornetq.core.journal.impl.JournalImpl;
/**
@@ -25,7 +24,7 @@
*
*
*/
-public class JournalAddRecord extends InternalEncoder
+public class JournalAddRecord extends JournalInternalRecord
{
private final long id;
Modified: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java 2009-11-28 02:30:52 UTC (rev 8435)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java 2009-11-28 02:43:42 UTC (rev 8436)
@@ -15,7 +15,6 @@
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.core.journal.impl.InternalEncoder;
import org.hornetq.core.journal.impl.JournalImpl;
/**
@@ -25,7 +24,7 @@
*
*
*/
-public class JournalAddRecordTX extends InternalEncoder
+public class JournalAddRecordTX extends JournalInternalRecord
{
private final long txID;
Modified: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2009-11-28 02:30:52 UTC (rev 8435)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2009-11-28 02:43:42 UTC (rev 8436)
@@ -15,7 +15,6 @@
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.core.journal.impl.InternalEncoder;
import org.hornetq.core.journal.impl.JournalImpl;
/**
@@ -37,7 +36,7 @@
*
*
*/
-public class JournalCompleteRecordTX extends InternalEncoder
+public class JournalCompleteRecordTX extends JournalInternalRecord
{
private final boolean isCommit;
Modified: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java 2009-11-28 02:30:52 UTC (rev 8435)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java 2009-11-28 02:43:42 UTC (rev 8436)
@@ -14,7 +14,6 @@
package org.hornetq.core.journal.impl.dataformat;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.journal.impl.InternalEncoder;
import org.hornetq.core.journal.impl.JournalImpl;
/**
@@ -24,7 +23,7 @@
*
*
*/
-public class JournalDeleteRecord extends InternalEncoder
+public class JournalDeleteRecord extends JournalInternalRecord
{
private final long id;
Modified: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java 2009-11-28 02:30:52 UTC (rev 8435)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java 2009-11-28 02:43:42 UTC (rev 8436)
@@ -15,7 +15,6 @@
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.core.journal.impl.InternalEncoder;
import org.hornetq.core.journal.impl.JournalImpl;
/**
@@ -25,7 +24,7 @@
*
*
*/
-public class JournalDeleteRecordTX extends InternalEncoder
+public class JournalDeleteRecordTX extends JournalInternalRecord
{
private final long txID;
Copied: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java (from rev 8435, trunk/src/main/org/hornetq/core/journal/impl/InternalEncoder.java)
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java 2009-11-28 02:43:42 UTC (rev 8436)
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+
+/**
+ * A InternalEncoder
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class JournalInternalRecord implements EncodingSupport
+{
+
+ protected int fileID;
+
+ public int getFileID()
+ {
+ return fileID;
+ }
+
+ public void setFileID(int fileID)
+ {
+ this.fileID = fileID;
+ }
+
+ public void decode(HornetQBuffer buffer)
+ {
+ }
+
+ public void setNumberOfRecords(int records)
+ {
+ }
+
+ public int getNumberOfRecords()
+ {
+ return 0;
+ }
+
+ public abstract int getEncodeSize();
+}
+
Modified: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2009-11-28 02:30:52 UTC (rev 8435)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2009-11-28 02:43:42 UTC (rev 8436)
@@ -14,7 +14,6 @@
package org.hornetq.core.journal.impl.dataformat;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.journal.impl.InternalEncoder;
import org.hornetq.core.journal.impl.JournalImpl;
/**
@@ -24,7 +23,7 @@
*
*
*/
-public class JournalRollbackRecordTX extends InternalEncoder
+public class JournalRollbackRecordTX extends JournalInternalRecord
{
private final long txID;
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-11-28 02:30:52 UTC (rev 8435)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-11-28 02:43:42 UTC (rev 8436)
@@ -23,7 +23,7 @@
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.TransactionFailureCallback;
-import org.hornetq.core.journal.impl.JournalImpl.ByteArrayEncoding;
+import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.replication.ReplicationManager;
15 years
JBoss hornetq SVN: r8435 - in trunk: src/main/org/hornetq/core/journal/impl and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-27 21:30:52 -0500 (Fri, 27 Nov 2009)
New Revision: 8435
Added:
trunk/src/main/org/hornetq/core/journal/impl/InternalEncoder.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
Modified:
trunk/src/main/org/hornetq/core/journal/SequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Optimization on journal - removing one unecessary buffer copy that was introduced after the TimedBuffer
Modified: trunk/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -60,6 +60,10 @@
void write(HornetQBuffer bytes, boolean sync) throws Exception;
+ void write(EncodingSupport bytes, boolean sync, IOAsyncTask callback) throws Exception;
+
+ void write(EncodingSupport bytes, boolean sync) throws Exception;
+
/** Write directly to the file without using any buffer */
void writeDirect(ByteBuffer bytes, boolean sync, IOAsyncTask callback);
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -22,6 +22,7 @@
import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.Pair;
@@ -151,12 +152,16 @@
}
}
- JournalImpl.writeAddRecord(-1,
- 1,
- (byte)0,
- new JournalImpl.ByteArrayEncoding(filesToRename.toByteBuffer().array()),
- JournalImpl.SIZE_ADD_RECORD + filesToRename.toByteBuffer().array().length,
- renameBuffer);
+
+ InternalEncoder controlRecord = new JournalAddRecord(true,
+ 1,
+ (byte)0,
+ new JournalImpl.ByteArrayEncoding(filesToRename.toByteBuffer()
+ .array()));
+
+ controlRecord.setFileID(-1);
+
+ controlRecord.encode(renameBuffer);
ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
@@ -229,7 +234,21 @@
{
return writingChannel;
}
+
+ protected void writeEncoder(InternalEncoder record) throws Exception
+ {
+ record.setFileID(fileID);
+ record.encode(getWritingChannel());
+ }
+ protected void writeEncoder(InternalEncoder record, int txcounter) throws Exception
+ {
+ record.setNumberOfRecords(txcounter);
+ writeEncoder(record);
+ }
+
+
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -21,7 +21,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -223,7 +225,44 @@
write(bytes, false, DummyCallback.getInstance());
}
}
+
+ public void write(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback) throws Exception
+ {
+ if (timedBuffer != null)
+ {
+ timedBuffer.addBytes(bytes, sync, callback);
+ }
+ else
+ {
+ ByteBuffer buffer = factory.newBuffer(bytes.getEncodeSize());
+
+ // If not using the TimedBuffer, a final copy is necessary
+ // Because AIO will need a specific Buffer
+ // And NIO will also need a whole buffer to perform the write
+
+ HornetQBuffer outBuffer = HornetQBuffers.wrappedBuffer(buffer);
+ bytes.encode(outBuffer);
+ buffer.rewind();
+ writeDirect(buffer, sync, callback);
+ }
+ }
+ public void write(final EncodingSupport bytes, final boolean sync) throws Exception
+ {
+ if (sync)
+ {
+ SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+
+ write(bytes, true, completion);
+
+ completion.waitCompletion();
+ }
+ else
+ {
+ write(bytes, false, DummyCallback.getInstance());
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Added: trunk/src/main/org/hornetq/core/journal/impl/InternalEncoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/InternalEncoder.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/InternalEncoder.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+
+/**
+ * A InternalEncoder
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class InternalEncoder implements EncodingSupport
+{
+
+ protected int fileID;
+
+ public int getFileID()
+ {
+ return fileID;
+ }
+
+ public void setFileID(int fileID)
+ {
+ this.fileID = fileID;
+ }
+
+ public void decode(HornetQBuffer buffer)
+ {
+ }
+
+ public void setNumberOfRecords(int records)
+ {
+ }
+
+ public int getNumberOfRecords()
+ {
+ return 0;
+ }
+
+ public abstract int getEncodeSize();
+}
+
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -20,7 +20,12 @@
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.utils.DataConstants;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalRollbackRecordTX;
/**
* A JournalCleaner
@@ -72,14 +77,10 @@
{
if (lookupRecord(info.id))
{
- int size = JournalImpl.SIZE_ADD_RECORD + info.data.length;
-
- JournalImpl.writeAddRecord(fileID,
- info.id,
- info.getUserRecordType(),
- new JournalImpl.ByteArrayEncoding(info.data),
- size,
- getWritingChannel());
+ writeEncoder(new JournalAddRecord(true,
+ info.id,
+ info.getUserRecordType(),
+ new JournalImpl.ByteArrayEncoding(info.data)));
}
}
@@ -92,15 +93,11 @@
{
incrementTransactionCounter(transactionID);
- int size = JournalImpl.SIZE_ADD_RECORD_TX + recordInfo.data.length;
-
- JournalImpl.writeAddRecordTX(fileID,
- transactionID,
- recordInfo.id,
- recordInfo.getUserRecordType(),
- new JournalImpl.ByteArrayEncoding(recordInfo.data),
- size,
- getWritingChannel());
+ writeEncoder(new JournalAddRecordTX(true,
+ transactionID,
+ recordInfo.id,
+ recordInfo.getUserRecordType(),
+ new JournalImpl.ByteArrayEncoding(recordInfo.data)));
}
}
@@ -111,14 +108,7 @@
{
int txcounter = getTransactionCounter(transactionID);
- JournalImpl.writeTransaction(fileID,
- JournalImpl.COMMIT_RECORD,
- transactionID,
- null,
- JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD,
- txcounter,
- getWritingChannel());
-
+ writeEncoder(new JournalCompleteRecordTX(true, transactionID, null), txcounter);
}
/* (non-Javadoc)
@@ -126,7 +116,7 @@
*/
public void onReadDeleteRecord(final long recordID) throws Exception
{
- JournalImpl.writeDeleteRecord(fileID, recordID, JournalImpl.SIZE_DELETE_RECORD, getWritingChannel());
+ writeEncoder(new JournalDeleteRecord(recordID));
}
/* (non-Javadoc)
@@ -134,16 +124,11 @@
*/
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
{
- int size = JournalImpl.SIZE_DELETE_RECORD_TX + recordInfo.data.length;
-
incrementTransactionCounter(transactionID);
- JournalImpl.writeDeleteRecordTransactional(fileID,
- transactionID,
- recordInfo.id,
- new JournalImpl.ByteArrayEncoding(recordInfo.data),
- size,
- getWritingChannel());
+ writeEncoder(new JournalDeleteRecordTX(transactionID,
+ recordInfo.id,
+ new JournalImpl.ByteArrayEncoding(recordInfo.data)));
}
/* (non-Javadoc)
@@ -153,15 +138,8 @@
{
int txcounter = getTransactionCounter(transactionID);
- int size = JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD + extraData.length + DataConstants.SIZE_INT;
-
- JournalImpl.writeTransaction(fileID,
- JournalImpl.PREPARE_RECORD,
- transactionID,
- new JournalImpl.ByteArrayEncoding(extraData),
- size,
- txcounter,
- getWritingChannel());
+ writeEncoder(new JournalCompleteRecordTX(false, transactionID, new JournalImpl.ByteArrayEncoding(extraData)),
+ txcounter);
}
/* (non-Javadoc)
@@ -169,7 +147,7 @@
*/
public void onReadRollbackRecord(final long transactionID) throws Exception
{
- JournalImpl.writeRollback(fileID, transactionID, getWritingChannel());
+ writeEncoder(new JournalRollbackRecordTX(transactionID));
}
/* (non-Javadoc)
@@ -179,13 +157,10 @@
{
if (lookupRecord(recordInfo.id))
{
- int size = JournalImpl.SIZE_UPDATE_RECORD + recordInfo.data.length;
- JournalImpl.writeUpdateRecord(fileID,
- recordInfo.id,
- recordInfo.userRecordType,
- new JournalImpl.ByteArrayEncoding(recordInfo.data),
- size,
- getWritingChannel());
+ writeEncoder(new JournalAddRecord(false,
+ recordInfo.id,
+ recordInfo.userRecordType,
+ new JournalImpl.ByteArrayEncoding(recordInfo.data)));
}
}
@@ -197,14 +172,12 @@
if (lookupRecord(recordInfo.id))
{
incrementTransactionCounter(transactionID);
- int size = JournalImpl.SIZE_UPDATE_RECORD_TX + recordInfo.data.length;
- JournalImpl.writeUpdateRecordTX(fileID,
- transactionID,
- recordInfo.id,
- recordInfo.userRecordType,
- new JournalImpl.ByteArrayEncoding(recordInfo.data),
- size,
- getWritingChannel());
+
+ writeEncoder(new JournalAddRecordTX(false,
+ transactionID,
+ recordInfo.id,
+ recordInfo.userRecordType,
+ new JournalImpl.ByteArrayEncoding(recordInfo.data)));
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -27,6 +27,10 @@
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl.JournalRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.Pair;
@@ -246,18 +250,16 @@
{
if (lookupRecord(info.id))
{
- int size = JournalImpl.SIZE_ADD_RECORD + info.data.length;
+ InternalEncoder addRecord = new JournalAddRecord(true,
+ info.id,
+ info.getUserRecordType(),
+ new JournalImpl.ByteArrayEncoding(info.data));
+
+ checkSize(addRecord.getEncodeSize());
- checkSize(size);
+ writeEncoder(addRecord);
- JournalImpl.writeAddRecord(fileID,
- info.id,
- info.getUserRecordType(),
- new JournalImpl.ByteArrayEncoding(info.data),
- size,
- getWritingChannel());
-
- newRecords.put(info.id, new JournalRecord(currentFile, size));
+ newRecords.put(info.id, new JournalRecord(currentFile, addRecord.getEncodeSize()));
}
}
@@ -267,19 +269,17 @@
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
- int size = JournalImpl.SIZE_ADD_RECORD_TX + info.data.length;
+ InternalEncoder record = new JournalAddRecordTX(true,
+ transactionID,
+ info.id,
+ info.getUserRecordType(),
+ new JournalImpl.ByteArrayEncoding(info.data));
+
+ checkSize(record.getEncodeSize());
- checkSize(size);
+ newTransaction.addPositive(currentFile, info.id, record.getEncodeSize());
- newTransaction.addPositive(currentFile, info.id, size);
-
- JournalImpl.writeAddRecordTX(fileID,
- transactionID,
- info.id,
- info.getUserRecordType(),
- new JournalImpl.ByteArrayEncoding(info.data),
- size,
- getWritingChannel());
+ writeEncoder(record);
}
else
{
@@ -315,17 +315,14 @@
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
- int size = JournalImpl.SIZE_DELETE_RECORD_TX + info.data.length;
+ InternalEncoder record = new JournalDeleteRecordTX(transactionID,
+ info.id,
+ new JournalImpl.ByteArrayEncoding(info.data));
- checkSize(size);
+ checkSize(record.getEncodeSize());
+
+ writeEncoder(record);
- JournalImpl.writeDeleteRecordTransactional(fileID,
- transactionID,
- info.id,
- new JournalImpl.ByteArrayEncoding(info.data),
- size,
- getWritingChannel());
-
newTransaction.addNegative(currentFile, info.id);
}
// else.. nothing to be done
@@ -343,17 +340,13 @@
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
- int size = JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD + extraData.length + DataConstants.SIZE_INT;
+ InternalEncoder prepareRecord = new JournalCompleteRecordTX(false,
+ transactionID,
+ new JournalImpl.ByteArrayEncoding(extraData));
- checkSize(size);
+ checkSize(prepareRecord.getEncodeSize());
- JournalImpl.writeTransaction(fileID,
- JournalImpl.PREPARE_RECORD,
- transactionID,
- new JournalImpl.ByteArrayEncoding(extraData),
- size,
- newTransaction.getCounter(currentFile),
- getWritingChannel());
+ writeEncoder(prepareRecord, newTransaction.getCounter(currentFile));
newTransaction.prepare(currentFile);
@@ -374,9 +367,12 @@
{
if (lookupRecord(info.id))
{
- int size = JournalImpl.SIZE_UPDATE_RECORD + info.data.length;
+ InternalEncoder updateRecord = new JournalAddRecord(false,
+ info.id,
+ info.userRecordType,
+ new JournalImpl.ByteArrayEncoding(info.data));
- checkSize(size);
+ checkSize(updateRecord.getEncodeSize());
JournalRecord newRecord = newRecords.get(info.id);
@@ -386,16 +382,10 @@
}
else
{
- newRecord.addUpdateFile(currentFile, size);
+ newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize());
}
-
- JournalImpl.writeUpdateRecord(fileID,
- info.id,
- info.userRecordType,
- new JournalImpl.ByteArrayEncoding(info.data),
- size,
- getWritingChannel());
-
+
+ writeEncoder(updateRecord);
}
}
@@ -405,19 +395,18 @@
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
- int size = JournalImpl.SIZE_UPDATE_RECORD_TX + info.data.length;
+ InternalEncoder updateRecordTX = new JournalAddRecordTX(false,
+ transactionID,
+ info.id,
+ info.userRecordType,
+ new JournalImpl.ByteArrayEncoding(info.data));
- checkSize(size);
+
+ checkSize(updateRecordTX.getEncodeSize());
- JournalImpl.writeUpdateRecordTX(fileID,
- transactionID,
- info.id,
- info.userRecordType,
- new JournalImpl.ByteArrayEncoding(info.data),
- size,
- getWritingChannel());
-
- newTransaction.addPositive(currentFile, info.id, size);
+ writeEncoder(updateRecordTX);
+
+ newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize());
}
else
{
@@ -425,6 +414,8 @@
}
}
+
+
/**
* @param transactionID
* @return
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -54,6 +54,12 @@
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.TestableJournal;
import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalRollbackRecordTX;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.Pair;
@@ -114,10 +120,6 @@
public static final byte ADD_RECORD = 11;
- public static final byte SIZE_UPDATE_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG +
- DataConstants.SIZE_BYTE +
- DataConstants.SIZE_INT /* + record.length */;
-
public static final byte UPDATE_RECORD = 12;
public static final int SIZE_ADD_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG +
@@ -127,11 +129,6 @@
public static final byte ADD_RECORD_TX = 13;
- public static final int SIZE_UPDATE_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG +
- DataConstants.SIZE_BYTE +
- DataConstants.SIZE_LONG +
- DataConstants.SIZE_INT /* + record.length */;
-
public static final byte UPDATE_RECORD_TX = 14;
public static final int SIZE_DELETE_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG +
@@ -292,207 +289,6 @@
this.maxAIO = maxAIO;
}
- // Public methods (used by package members such as JournalCompactor) (these methods are not part of the JournalImpl
- // interface)
-
- /**
- * <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
- * <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file.
- * (What could happen if there are too many pendingTransactions, or if an user event delayed pendingTransactions to come in time to a single file).</p>
- * <p>The element-summary will then have</p>
- * <p>FileID1, 10</p>
- * <p>FileID2, 10</p>
- * <p>FileID3, 10</p>
- *
- * <br>
- * <p> During the load, the transaction needs to have 30 pendingTransactions spread across the files as originally written.</p>
- * <p> If for any reason there are missing pendingTransactions, that means the transaction was not completed and we should ignore the whole transaction </p>
- * <p> We can't just use a global counter as reclaiming could delete files after the transaction was successfully committed.
- * That also means not having a whole file on journal-reload doesn't mean we have to invalidate the transaction </p>
- *
- * @param recordType
- * @param txID
- * @param tx
- * @param transactionData
- * @return
- * @throws Exception
- */
- public static void writeTransaction(final int fileID,
- final byte recordType,
- final long txID,
- final EncodingSupport transactionData,
- final int size,
- final int numberOfRecords,
- final HornetQBuffer bb) throws Exception
- {
- bb.writeByte(recordType);
- bb.writeInt(fileID); // skip ID part
- bb.writeLong(txID);
- bb.writeInt(numberOfRecords);
-
- if (transactionData != null)
- {
- bb.writeInt(transactionData.getEncodeSize());
- }
-
- if (transactionData != null)
- {
- transactionData.encode(bb);
- }
-
- bb.writeInt(size);
- }
-
- /**
- * @param txID
- * @param id
- * @param recordType
- * @param record
- * @param size
- * @param bb
- */
- public static void writeUpdateRecordTX(final int fileID,
- final long txID,
- final long id,
- final byte recordType,
- final EncodingSupport record,
- final int size,
- final HornetQBuffer bb)
- {
- bb.writeByte(UPDATE_RECORD_TX);
- bb.writeInt(fileID);
- bb.writeLong(txID);
- bb.writeLong(id);
- bb.writeInt(record.getEncodeSize());
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
- }
-
- /**
- * @param txID
- * @param bb
- */
- public static void writeRollback(final int fileID, final long txID, HornetQBuffer bb)
- {
- bb.writeByte(ROLLBACK_RECORD);
- bb.writeInt(fileID);
- bb.writeLong(txID);
- bb.writeInt(SIZE_ROLLBACK_RECORD);
- }
-
- /**
- * @param id
- * @param recordType
- * @param record
- * @param size
- * @param bb
- */
- public static void writeUpdateRecord(final int fileId,
- final long id,
- final byte recordType,
- final EncodingSupport record,
- final int size,
- final HornetQBuffer bb)
- {
- bb.writeByte(UPDATE_RECORD);
- bb.writeInt(fileId);
- bb.writeLong(id);
- bb.writeInt(record.getEncodeSize());
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
- }
-
- /**
- * @param id
- * @param recordType
- * @param record
- * @param size
- * @param bb
- */
- public static void writeAddRecord(final int fileId,
- final long id,
- final byte recordType,
- final EncodingSupport record,
- final int size,
- final HornetQBuffer bb)
- {
- bb.writeByte(ADD_RECORD);
- bb.writeInt(fileId);
- bb.writeLong(id);
- bb.writeInt(record.getEncodeSize());
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
- }
-
- /**
- * @param id
- * @param size
- * @param bb
- */
- public static void writeDeleteRecord(final int fileId, final long id, int size, HornetQBuffer bb)
- {
- bb.writeByte(DELETE_RECORD);
- bb.writeInt(fileId);
- bb.writeLong(id);
- bb.writeInt(size);
- }
-
- /**
- * @param txID
- * @param id
- * @param record
- * @param size
- * @param bb
- */
- public static void writeDeleteRecordTransactional(final int fileID,
- final long txID,
- final long id,
- final EncodingSupport record,
- final int size,
- final HornetQBuffer bb)
- {
- bb.writeByte(DELETE_RECORD_TX);
- bb.writeInt(fileID);
- bb.writeLong(txID);
- bb.writeLong(id);
- bb.writeInt(record != null ? record.getEncodeSize() : 0);
- if (record != null)
- {
- record.encode(bb);
- }
- bb.writeInt(size);
- }
-
- /**
- * @param txID
- * @param id
- * @param recordType
- * @param record
- * @param recordLength
- * @param size
- * @param bb
- */
- public static void writeAddRecordTX(final int fileID,
- final long txID,
- final long id,
- final byte recordType,
- final EncodingSupport record,
- final int size,
- final HornetQBuffer bb)
- {
- bb.writeByte(ADD_RECORD_TX);
- bb.writeInt(fileID);
- bb.writeLong(txID);
- bb.writeLong(id);
- bb.writeInt(record.getEncodeSize());
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
- }
-
public Map<Long, JournalRecord> getRecords()
{
return records;
@@ -843,7 +639,7 @@
{
appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
-
+
public void appendAddRecord(final long id, final byte recordType, final byte[] record, final boolean sync, final IOCompletion callback) throws Exception
{
appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
@@ -876,12 +672,8 @@
try
{
- int size = SIZE_ADD_RECORD + record.getEncodeSize();
+ InternalEncoder addRecord = new JournalAddRecord(true, id, recordType, record);
- HornetQBuffer bb = newBuffer(size);
-
- writeAddRecord(-1, id, recordType, record, size, bb); // fileID will be filled later
-
if (callback != null)
{
callback.lineUp();
@@ -890,9 +682,9 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
+ JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
- records.put(id, new JournalRecord(usedFile, size));
+ records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
}
finally
{
@@ -952,12 +744,8 @@
}
}
- int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
+ InternalEncoder updateRecord = new JournalAddRecord(false, id, recordType, record);
- HornetQBuffer bb = newBuffer(size);
-
- writeUpdateRecord(-1, id, recordType, record, size, bb);
-
if (callback != null)
{
callback.lineUp();
@@ -966,17 +754,17 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
+ JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
if (jrnRecord == null)
{
- compactor.addCommandUpdate(id, usedFile, size);
+ compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize());
}
else
{
- jrnRecord.addUpdateFile(usedFile, size);
+ jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
}
}
finally
@@ -1029,12 +817,8 @@
}
}
- int size = SIZE_DELETE_RECORD;
+ InternalEncoder deleteRecord = new JournalDeleteRecord(id);
- HornetQBuffer bb = newBuffer(size);
-
- writeDeleteRecord(-1, id, size, bb);
-
if (callback != null)
{
callback.lineUp();
@@ -1043,7 +827,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
+ JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
@@ -1093,20 +877,16 @@
try
{
- int size = SIZE_ADD_RECORD_TX + record.getEncodeSize();
+ InternalEncoder addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
- HornetQBuffer bb = newBuffer(size);
-
- writeAddRecordTX(-1, txID, id, recordType, record, size, bb);
-
JournalTransaction tx = getTransactionInfo(txID);
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, false, tx, null);
+ JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
- tx.addPositive(usedFile, id, size);
+ tx.addPositive(usedFile, id, addRecord.getEncodeSize());
}
finally
{
@@ -1146,20 +926,16 @@
try
{
- int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
+ InternalEncoder updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
- HornetQBuffer bb = newBuffer(size);
-
- writeUpdateRecordTX(-1, txID, id, recordType, record, size, bb);
-
JournalTransaction tx = getTransactionInfo(txID);
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, false, tx, null);
+ JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null);
- tx.addPositive(usedFile, id, size);
+ tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize());
}
finally
{
@@ -1193,18 +969,14 @@
try
{
- int size = SIZE_DELETE_RECORD_TX + record.getEncodeSize();
+ InternalEncoder deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
- HornetQBuffer bb = newBuffer(size);
-
- writeDeleteRecordTransactional(-1, txID, id, record, size, bb);
-
JournalTransaction tx = getTransactionInfo(txID);
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, false, tx, null);
+ JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
tx.addNegative(usedFile, id);
}
@@ -1282,11 +1054,8 @@
try
{
- int size = SIZE_COMPLETE_TRANSACTION_RECORD + transactionData.getEncodeSize() + DataConstants.SIZE_INT;
- HornetQBuffer bb = newBuffer(size);
+ InternalEncoder prepareRecord = new JournalCompleteRecordTX(false, txID, transactionData);
- writeTransaction(-1, PREPARE_RECORD, txID, transactionData, size, -1, bb);
-
if (callback != null)
{
callback.lineUp();
@@ -1295,7 +1064,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, true, sync, tx, callback);
+ JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
tx.prepare(usedFile);
}
@@ -1365,16 +1134,8 @@
throw new IllegalStateException("Cannot find tx with id " + txID);
}
- HornetQBuffer bb = newBuffer(SIZE_COMPLETE_TRANSACTION_RECORD);
+ InternalEncoder commitRecord = new JournalCompleteRecordTX(true, txID, null);
- writeTransaction(-1,
- COMMIT_RECORD,
- txID,
- null,
- SIZE_COMPLETE_TRANSACTION_RECORD,
- -1 /* number of records on this transaction will be filled later inside append record */,
- bb);
-
if (callback != null)
{
callback.lineUp();
@@ -1383,7 +1144,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, true, sync, tx, callback);
+ JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
tx.commit(usedFile);
}
@@ -1433,10 +1194,8 @@
throw new IllegalStateException("Cannot find tx with id " + txID);
}
- HornetQBuffer bb = newBuffer(SIZE_ROLLBACK_RECORD);
+ InternalEncoder rollbackRecord = new JournalRollbackRecordTX(txID);
- writeRollback(-1, txID, bb);
-
if (callback != null)
{
callback.lineUp();
@@ -1445,7 +1204,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, sync, tx, callback);
+ JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
tx.rollback(usedFile);
}
@@ -1586,6 +1345,7 @@
try
{
+ trace("Starting compacting operation on journal");
log.debug("Starting compacting operation on journal");
// We need to guarantee that the journal is frozen for this short time
@@ -1867,7 +1627,7 @@
// have been deleted
// just leaving some updates in this file
- posFiles.addUpdateFile(file, info.data.length + SIZE_UPDATE_RECORD);
+ posFiles.addUpdateFile(file, info.data.length + SIZE_ADD_RECORD);
}
}
@@ -2337,6 +2097,10 @@
try
{
+ if (trace)
+ {
+ trace("Cleaning up file " + file);
+ }
log.debug("Cleaning up file " + file);
if (file.getPosCount() == 0)
@@ -2847,13 +2611,13 @@
recordSize = SIZE_ADD_RECORD;
break;
case UPDATE_RECORD:
- recordSize = SIZE_UPDATE_RECORD;
+ recordSize = SIZE_ADD_RECORD;
break;
case ADD_RECORD_TX:
recordSize = SIZE_ADD_RECORD_TX;
break;
case UPDATE_RECORD_TX:
- recordSize = SIZE_UPDATE_RECORD_TX;
+ recordSize = SIZE_ADD_RECORD_TX;
break;
case DELETE_RECORD:
recordSize = SIZE_DELETE_RECORD;
@@ -2933,7 +2697,7 @@
*
* @param completeTransaction If the appendRecord is for a prepare or commit, where we should update the number of pendingTransactions on the current file
* */
- private JournalFile appendRecord(final HornetQBuffer bb,
+ private JournalFile appendRecord(final InternalEncoder encoder,
final boolean completeTransaction,
final boolean sync,
final JournalTransaction tx,
@@ -2948,7 +2712,7 @@
final IOAsyncTask callback;
- int size = bb.capacity();
+ int size = encoder.getEncodeSize();
// We take into account the fileID used on the Header
if (size > fileSize - currentFile.getFile().calculateBlockStart(SIZE_HEADER))
@@ -3012,7 +2776,7 @@
if (completeTransaction)
{
// Filling the number of pendingTransactions at the current file
- tx.fillNumberOfRecords(currentFile, bb);
+ tx.fillNumberOfRecords(currentFile, encoder);
}
}
else
@@ -3021,16 +2785,15 @@
}
// Adding fileID
- bb.writerIndex(DataConstants.SIZE_BYTE);
- bb.writeInt(currentFile.getFileID());
+ encoder.setFileID(currentFile.getFileID());
if (callback != null)
{
- currentFile.getFile().write(bb, sync, callback);
+ currentFile.getFile().write(encoder, sync, callback);
}
else
{
- currentFile.getFile().write(bb, sync);
+ currentFile.getFile().write(encoder, sync);
}
return currentFile;
@@ -3615,7 +3378,7 @@
return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
}
}
-
+
private class PerfBlast extends Thread
{
private final int pages;
@@ -3633,13 +3396,13 @@
{
lockAppend.lock();
- HornetQBuffer bb = newBuffer(490 * 1024);
+// HornetQBuffer bb = newBuffer(128 * 1024);
+//
+// for (int i = 0; i < pages; i++)
+// {
+// appendRecord(bb, false, false, null, null);
+// }
- for (int i = 0; i < pages; i++)
- {
- appendRecord(bb, false, true, null, null);
- }
-
lockAppend.unlock();
}
catch (Exception e)
@@ -3648,5 +3411,11 @@
}
}
}
+
+
+
+
+
+
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -184,12 +184,9 @@
* @param currentFile
* @param bb
*/
- public void fillNumberOfRecords(final JournalFile currentFile, final HornetQBuffer bb)
+ public void fillNumberOfRecords(final JournalFile currentFile, final InternalEncoder data)
{
- bb.writerIndex(DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG);
-
- bb.writeInt(getCounter(currentFile));
-
+ data.setNumberOfRecords(getCounter(currentFile));
}
/** 99.99 % of the times previous files will be already synced, since they are scheduled to be closed.
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -24,7 +24,9 @@
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.buffers.HornetQBuffers;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.VariableLatch;
@@ -231,13 +233,18 @@
public synchronized void addBytes(final HornetQBuffer bytes, final boolean sync, final IOAsyncTask callback)
{
+ addBytes(new JournalImpl.ByteArrayEncoding(bytes.toByteBuffer().array()), sync, callback);
+ }
+
+ public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback)
+ {
if (buffer.writerIndex() == 0)
{
// Resume latch
latchTimer.down();
}
- buffer.writeBytes(bytes, bytes.capacity());
+ bytes.encode(buffer);
callbacks.add(callback);
Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.impl.InternalEncoder;
+import org.hornetq.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalAddRecord
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalAddRecord extends InternalEncoder
+{
+
+ private final long id;
+
+ private final EncodingSupport record;
+
+ private final byte recordType;
+
+ private final boolean add;
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param size
+ */
+ public JournalAddRecord(final boolean add, final long id, final byte recordType, final EncodingSupport record)
+ {
+ this.id = id;
+
+ this.record = record;
+
+ this.recordType = recordType;
+
+ this.add = add;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.core.buffers.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+ if (add)
+ {
+ buffer.writeByte(JournalImpl.ADD_RECORD);
+ }
+ else
+ {
+ buffer.writeByte(JournalImpl.UPDATE_RECORD);
+ }
+
+ buffer.writeInt(fileID);
+
+ buffer.writeLong(id);
+
+ buffer.writeInt(record.getEncodeSize());
+
+ buffer.writeByte(recordType);
+
+ record.encode(buffer);
+
+ buffer.writeInt(getEncodeSize());
+ }
+
+ @Override
+ public int getEncodeSize()
+ {
+ return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize();
+ }
+}
\ No newline at end of file
Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.impl.InternalEncoder;
+import org.hornetq.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalAddRecordTX
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalAddRecordTX extends InternalEncoder
+{
+
+ private final long txID;
+
+ private final long id;
+
+ private final EncodingSupport record;
+
+ private final byte recordType;
+
+ private final boolean add;
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param size
+ */
+ public JournalAddRecordTX(final boolean add,
+ final long txID,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record)
+ {
+
+ this.txID = txID;
+
+ this.id = id;
+
+ this.record = record;
+
+ this.recordType = recordType;
+
+ this.add = add;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.core.buffers.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+ if (add)
+ {
+ buffer.writeByte(JournalImpl.ADD_RECORD_TX);
+ }
+ else
+ {
+ buffer.writeByte(JournalImpl.UPDATE_RECORD_TX);
+ }
+
+ buffer.writeInt(fileID);
+
+ buffer.writeLong(txID);
+
+ buffer.writeLong(id);
+
+ buffer.writeInt(record.getEncodeSize());
+
+ buffer.writeByte(recordType);
+
+ record.encode(buffer);
+
+ buffer.writeInt(getEncodeSize());
+ }
+
+ @Override
+ public int getEncodeSize()
+ {
+ return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize();
+ }
+}
Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.impl.InternalEncoder;
+import org.hornetq.core.journal.impl.JournalImpl;
+
+/**
+ * <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
+ * <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file.
+ * (What could happen if there are too many pendingTransactions, or if an user event delayed pendingTransactions to come in time to a single file).</p>
+ * <p>The element-summary will then have</p>
+ * <p>FileID1, 10</p>
+ * <p>FileID2, 10</p>
+ * <p>FileID3, 10</p>
+ *
+ * <br>
+ * <p> During the load, the transaction needs to have 30 pendingTransactions spread across the files as originally written.</p>
+ * <p> If for any reason there are missing pendingTransactions, that means the transaction was not completed and we should ignore the whole transaction </p>
+ * <p> We can't just use a global counter as reclaiming could delete files after the transaction was successfully committed.
+ * That also means not having a whole file on journal-reload doesn't mean we have to invalidate the transaction </p>
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalCompleteRecordTX extends InternalEncoder
+{
+ private final boolean isCommit;
+
+ private final long txID;
+
+ private final EncodingSupport transactionData;
+
+ private int numberOfRecords;
+
+ public JournalCompleteRecordTX(final boolean isCommit, final long txID, final EncodingSupport transactionData)
+ {
+ this.isCommit = isCommit;
+
+ this.txID = txID;
+
+ this.transactionData = transactionData;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.core.buffers.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+ if (isCommit)
+ {
+ buffer.writeByte(JournalImpl.COMMIT_RECORD);
+ }
+ else
+ {
+ buffer.writeByte(JournalImpl.PREPARE_RECORD);
+ }
+
+ buffer.writeInt(fileID);
+
+ buffer.writeLong(txID);
+
+ buffer.writeInt(numberOfRecords);
+
+ if (transactionData != null)
+ {
+ buffer.writeInt(transactionData.getEncodeSize());
+ }
+
+ if (transactionData != null)
+ {
+ transactionData.encode(buffer);
+ }
+
+ buffer.writeInt(getEncodeSize());
+ }
+
+ @Override
+ public void setNumberOfRecords(final int records)
+ {
+ numberOfRecords = records;
+ }
+
+ @Override
+ public int getNumberOfRecords()
+ {
+ return numberOfRecords;
+ }
+
+ @Override
+ public int getEncodeSize()
+ {
+ if (isCommit)
+ {
+ return JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD;
+ }
+ else
+ {
+ return JournalImpl.SIZE_PREPARE_RECORD + (transactionData != null ? transactionData.getEncodeSize() : 0);
+ }
+ }
+}
Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.impl.InternalEncoder;
+import org.hornetq.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalDeleteRecord
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalDeleteRecord extends InternalEncoder
+{
+
+ private final long id;
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param size
+ */
+ public JournalDeleteRecord(final long id)
+ {
+ this.id = id;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.core.buffers.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(JournalImpl.DELETE_RECORD);
+
+ buffer.writeInt(fileID);
+
+ buffer.writeLong(id);
+
+ buffer.writeInt(getEncodeSize());
+ }
+
+ @Override
+ public int getEncodeSize()
+ {
+ return JournalImpl.SIZE_DELETE_RECORD;
+ }
+}
Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.impl.InternalEncoder;
+import org.hornetq.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalDeleteRecordTX
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalDeleteRecordTX extends InternalEncoder
+{
+
+ private final long txID;
+
+ private final long id;
+
+ private final EncodingSupport record;
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param size
+ */
+ public JournalDeleteRecordTX(final long txID, final long id, final EncodingSupport record)
+ {
+ this.id = id;
+
+ this.txID = txID;
+
+ this.record = record;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.core.buffers.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(JournalImpl.DELETE_RECORD_TX);
+
+ buffer.writeInt(fileID);
+
+ buffer.writeLong(txID);
+
+ buffer.writeLong(id);
+
+ buffer.writeInt(record != null ? record.getEncodeSize() : 0);
+
+ if (record != null)
+ {
+ record.encode(buffer);
+ }
+
+ buffer.writeInt(getEncodeSize());
+ }
+
+ @Override
+ public int getEncodeSize()
+ {
+ return JournalImpl.SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0);
+ }
+}
Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.impl.InternalEncoder;
+import org.hornetq.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalRollbackRecordTX
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalRollbackRecordTX extends InternalEncoder
+{
+ private final long txID;
+
+ public JournalRollbackRecordTX(final long txID)
+ {
+ this.txID = txID;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.core.buffers.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(JournalImpl.ROLLBACK_RECORD);
+ buffer.writeInt(fileID);
+ buffer.writeLong(txID);
+ buffer.writeInt(JournalImpl.SIZE_ROLLBACK_RECORD);
+
+ }
+
+ @Override
+ public int getEncodeSize()
+ {
+ return JournalImpl.SIZE_ROLLBACK_RECORD;
+ }
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -616,7 +616,7 @@
journal.forceMoveNextFile();
update(id);
- expectedSizes.add(recordLength + JournalImpl.SIZE_UPDATE_RECORD);
+ expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD);
journal.forceMoveNextFile();
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -719,7 +719,7 @@
{
final int JOURNAL_SIZE = 2000;
- setupAndLoadJournal(JOURNAL_SIZE, 100);
+ setupAndLoadJournal(JOURNAL_SIZE, 1);
assertEquals(2, factory.listFiles("tt").size());
@@ -759,6 +759,7 @@
// reload will think the record came from a different journal usage)
file.position(100);
+ buffer.rewind();
file.writeDirect(buffer, true);
file.close();
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -279,7 +279,7 @@
for (long element : arguments)
{
- byte[] updateRecord = generateRecord(recordLength - JournalImpl.SIZE_UPDATE_RECORD_TX);
+ byte[] updateRecord = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX);
journal.appendUpdateRecordTransactional(txID, element, (byte)0, updateRecord);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -3118,7 +3118,9 @@
addTx(1, 1);
+ addTx(1, 2);
updateTx(1, 1);
+ updateTx(1, 3);
commit(1);
update(1);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -20,7 +20,9 @@
import java.util.concurrent.ConcurrentHashMap;
import org.hornetq.core.asyncio.BufferCallback;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -622,8 +624,32 @@
bytes.readerIndex(0);
writeDirect(bytes.toByteBuffer(), sync);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.SequentialFile#write(org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void write(EncodingSupport bytes, boolean sync, IOAsyncTask callback) throws Exception
+ {
+ ByteBuffer buffer = newBuffer(bytes.getEncodeSize());
+ HornetQBuffer outbuffer = HornetQBuffers.wrappedBuffer(buffer);
+ bytes.encode(outbuffer);
+ write(outbuffer, sync, callback);
+ }
/* (non-Javadoc)
+ * @see org.hornetq.core.journal.SequentialFile#write(org.hornetq.core.journal.EncodingSupport, boolean)
+ */
+ public void write(EncodingSupport bytes, boolean sync) throws Exception
+ {
+ ByteBuffer buffer = newBuffer(bytes.getEncodeSize());
+ HornetQBuffer outbuffer = HornetQBuffers.wrappedBuffer(buffer);
+ bytes.encode(outbuffer);
+ write(outbuffer, sync);
+ }
+
+
+
+ /* (non-Javadoc)
* @see org.hornetq.core.journal.SequentialFile#exists()
*/
public boolean exists()
15 years
JBoss hornetq SVN: r8434 - trunk/tests/src/org/hornetq/tests/unit/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-27 20:26:06 -0500 (Fri, 27 Nov 2009)
New Revision: 8434
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
Log:
Adding a new test on journal for debug purposes
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-11-27 21:20:56 UTC (rev 8433)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-11-28 01:26:06 UTC (rev 8434)
@@ -3106,7 +3106,29 @@
assertEquals(0, journal.getDataFilesCount());
}
+
+
+ public void testAddTexThenUpdate() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+
+
+ addTx(1, 1);
+ updateTx(1, 1);
+ commit(1);
+ update(1);
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
protected abstract int getAlignment();
}
15 years
JBoss hornetq SVN: r8433 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-27 16:20:56 -0500 (Fri, 27 Nov 2009)
New Revision: 8433
Modified:
trunk/src/main/org/hornetq/core/persistence/OperationContext.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java
trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-226 - Large Message and Diverts
Modified: trunk/src/main/org/hornetq/core/persistence/OperationContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/OperationContext.java 2009-11-27 21:15:05 UTC (rev 8432)
+++ trunk/src/main/org/hornetq/core/persistence/OperationContext.java 2009-11-27 21:20:56 UTC (rev 8433)
@@ -13,8 +13,6 @@
package org.hornetq.core.persistence;
-import java.util.concurrent.Executor;
-
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.IOCompletion;
@@ -30,10 +28,6 @@
public interface OperationContext extends IOCompletion
{
- /** The executor used on responses.
- * If this is not set, it will use the current thread. */
- void setExecutor(Executor executor);
-
/** Execute the task when all IO operations are complete,
* Or execute it immediately if nothing is pending. */
void executeOnCompletion(IOAsyncTask runnable);
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-27 21:15:05 UTC (rev 8432)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-27 21:20:56 UTC (rev 8433)
@@ -81,7 +81,7 @@
private String errorMessage = null;
- private Executor executor;
+ private final Executor executor;
private final AtomicInteger executorsPending = new AtomicInteger(0);
@@ -102,12 +102,6 @@
replicationLineUp++;
}
- /** this method needs to be called before the executor became operational */
- public void setExecutor(Executor executor)
- {
- this.executor = executor;
- }
-
public synchronized void replicationDone()
{
replicated++;
@@ -137,25 +131,18 @@
// On this case, we can just execute the context directly
if (replicationLineUp == replicated && storeLineUp == stored)
{
- if (executor != null)
+ // We want to avoid the executor if everything is complete...
+ // However, we can't execute the context if there are executions pending
+ // We need to use the executor on this case
+ if (executorsPending.get() == 0)
{
- // We want to avoid the executor if everything is complete...
- // However, we can't execute the context if there are executions pending
- // We need to use the executor on this case
- if (executorsPending.get() == 0)
- {
- // No need to use an executor here or a context switch
- // there are no actions pending.. hence we can just execute the task directly on the same thread
- executeNow = true;
- }
- else
- {
- execute(completion);
- }
+ // No need to use an executor here or a context switch
+ // there are no actions pending.. hence we can just execute the task directly on the same thread
+ executeNow = true;
}
else
{
- executeNow = true;
+ execute(completion);
}
}
else
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-27 21:15:05 UTC (rev 8432)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-27 21:20:56 UTC (rev 8433)
@@ -550,11 +550,6 @@
throw new IllegalStateException("Message cannot be routed more than once");
}
- if (message.getMessageID() == 0l)
- {
- generateID(message);
- }
-
RoutingContext context = new RoutingContextImpl(tx);
SimpleString address = message.getDestination();
Modified: trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-11-27 21:15:05 UTC (rev 8432)
+++ trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-11-27 21:20:56 UTC (rev 8433)
@@ -17,6 +17,7 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.Divert;
import org.hornetq.core.server.RoutingContext;
@@ -50,6 +51,8 @@
private final Filter filter;
private final Transformer transformer;
+
+ private final StorageManager storageManager;
public DivertImpl(final SimpleString forwardAddress,
final SimpleString uniqueName,
@@ -57,7 +60,8 @@
final boolean exclusive,
final Filter filter,
final Transformer transformer,
- final PostOffice postOffice)
+ final PostOffice postOffice,
+ final StorageManager storageManager)
{
this.forwardAddress = forwardAddress;
@@ -72,6 +76,8 @@
this.transformer = transformer;
this.postOffice = postOffice;
+
+ this.storageManager = storageManager;
}
public void route(final ServerMessage message, final RoutingContext context) throws Exception
@@ -81,17 +87,16 @@
// We must make a copy of the message, otherwise things like returning credits to the page won't work
// properly on ack, since the original destination will be overwritten
- // TODO we can optimise this so it doesn't copy if it's not routed anywhere else
+ // TODO we can optimise this so it doesn't copy if it's not routed anywhere else
- ServerMessage copy = message.copy();
+ long id = storageManager.generateUniqueID();
+ ServerMessage copy = message.copy(id);
- // Setting the messageID to 0. The postOffice should set a new one
- copy.setMessageID(0);
-
+ // This will set the original MessageId, and the original destination
+ copy.setOriginalHeaders(message, false);
+
copy.setDestination(forwardAddress);
- copy.putStringProperty(HDR_ORIGINAL_DESTINATION, originalDestination);
-
if (transformer != null)
{
copy = transformer.transform(copy);
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-27 21:15:05 UTC (rev 8432)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-27 21:20:56 UTC (rev 8433)
@@ -1414,7 +1414,8 @@
config.isExclusive(),
filter,
transformer,
- postOffice);
+ postOffice,
+ storageManager);
// pagingManager,
// storageManager);
Modified: trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java 2009-11-27 21:15:05 UTC (rev 8432)
+++ trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java 2009-11-27 21:20:56 UTC (rev 8433)
@@ -45,8 +45,20 @@
{
private static final Logger log = Logger.getLogger(DivertTest.class);
+ final int minLargeMessageSize = ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 2;
+
public void testPersistentDivert() throws Exception
{
+ doTestPersistentDivert(false);
+ }
+
+ public void testPersistentDiverLargeMessage() throws Exception
+ {
+ doTestPersistentDivert(true);
+ }
+
+ public void doTestPersistentDivert(boolean largeMessage) throws Exception
+ {
Configuration conf = createDefaultConfig();
conf.setClustered(true);
@@ -121,6 +133,11 @@
{
ClientMessage message = session.createClientMessage(true);
+ if (largeMessage)
+ {
+ message.setBodyInputStream(createFakeLargeStream(minLargeMessageSize));
+ }
+
message.putIntProperty(propKey, i);
producer.send(message);
@@ -128,12 +145,17 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receive(5000);
assertNotNull(message);
assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
+ if (largeMessage)
+ {
+ checkLargeMessage(message);
+ }
+
message.acknowledge();
}
@@ -141,12 +163,17 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer2.receive(200);
+ ClientMessage message = consumer2.receive(5000);
assertNotNull(message);
assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
+ if (largeMessage)
+ {
+ checkLargeMessage(message);
+ }
+
message.acknowledge();
}
@@ -154,12 +181,17 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer3.receive(200);
+ ClientMessage message = consumer3.receive(5000);
assertNotNull(message);
assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
+ if (largeMessage)
+ {
+ checkLargeMessage(message);
+ }
+
message.acknowledge();
}
@@ -167,12 +199,17 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer4.receive(200);
+ ClientMessage message = consumer4.receive(5000);
assertNotNull(message);
assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
+ if (largeMessage)
+ {
+ checkLargeMessage(message);
+ }
+
message.acknowledge();
}
@@ -183,9 +220,30 @@
messagingService.stop();
}
+
+ /**
+ * @param message
+ */
+ private void checkLargeMessage(ClientMessage message)
+ {
+ for (int j = 0 ; j < minLargeMessageSize; j++)
+ {
+ assertEquals(getSamplebyte(j), message.getBodyBuffer().readByte());
+ }
+ }
public void testPersistentDivertRestartBeforeConsume() throws Exception
{
+ doTestPersistentDivertRestartBeforeConsume(false);
+ }
+
+ public void testPersistentDivertRestartBeforeConsumeLargeMessage() throws Exception
+ {
+ doTestPersistentDivertRestartBeforeConsume(true);
+ }
+
+ public void doTestPersistentDivertRestartBeforeConsume(boolean largeMessage) throws Exception
+ {
Configuration conf = createDefaultConfig();
conf.setClustered(true);
@@ -251,6 +309,12 @@
ClientMessage message = session.createClientMessage(true);
message.putIntProperty(propKey, i);
+
+ if (largeMessage)
+ {
+ message.setBodyInputStream(createFakeLargeStream(minLargeMessageSize));
+ }
+
producer.send(message);
}
@@ -281,10 +345,15 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receive(5000);
assertNotNull(message);
+ if (largeMessage)
+ {
+ checkLargeMessage(message);
+ }
+
assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
message.acknowledge();
@@ -294,10 +363,15 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer2.receive(200);
+ ClientMessage message = consumer2.receive(5000);
assertNotNull(message);
+ if (largeMessage)
+ {
+ checkLargeMessage(message);
+ }
+
assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
message.acknowledge();
@@ -307,10 +381,15 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer3.receive(200);
+ ClientMessage message = consumer3.receive(5000);
assertNotNull(message);
+ if (largeMessage)
+ {
+ checkLargeMessage(message);
+ }
+
assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
message.acknowledge();
@@ -320,10 +399,15 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer4.receive(200);
+ ClientMessage message = consumer4.receive(5000);
assertNotNull(message);
+ if (largeMessage)
+ {
+ checkLargeMessage(message);
+ }
+
assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
message.acknowledge();
Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java 2009-11-27 21:15:05 UTC (rev 8432)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java 2009-11-27 21:20:56 UTC (rev 8433)
@@ -112,7 +112,6 @@
for (int i = 0; i < 200; i++)
{
- System.out.println("Sent " + i);
producer.send(message);
}
15 years