[hornetq-commits] JBoss hornetq SVN: r10676 - in trunk/hornetq-core/src/main/java/org/hornetq/core: config/impl and 12 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue May 17 00:51:32 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-05-17 00:51:31 -0400 (Tue, 17 May 2011)
New Revision: 10676
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/Topology.java
trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/ConfigurationImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/FileConfiguration.java
trunk/hornetq-core/src/main/java/org/hornetq/core/journal/SequentialFile.java
trunk/hornetq-core/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/PrintPages.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PageSubscription.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageSyncTimer.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/version/impl/VersionImpl.java
Log:
syncing branches EAP and trunk with changes on core
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -47,7 +47,6 @@
/**
* @return the largeMessageSize
*/
- @Override
public long getLargeMessageSize()
{
return largeMessageSize;
@@ -56,7 +55,6 @@
/**
* @param largeMessageSize the largeMessageSize to set
*/
- @Override
public void setLargeMessageSize(long largeMessageSize)
{
this.largeMessageSize = largeMessageSize;
@@ -92,7 +90,6 @@
return true;
}
- @Override
public void setLargeMessageController(final LargeMessageController controller)
{
largeMessageController = controller;
@@ -112,7 +109,6 @@
return getLongProperty(Message.HDR_LARGE_BODY_SIZE).intValue();
}
- @Override
public LargeMessageController getLargeMessageController()
{
return largeMessageController;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -205,7 +205,6 @@
}
- @Override
public void connect(int initialConnectAttempts, boolean failoverOnInitialConnection) throws HornetQException
{
// Get the connection
@@ -224,28 +223,35 @@
}
- @Override
public TransportConfiguration getConnectorConfiguration()
{
return connectorConfig;
}
- @Override
public void setBackupConnector(TransportConfiguration live, TransportConfiguration backUp)
{
if(live.equals(connectorConfig) && backUp != null)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Setting up backup config = " + backUp + " for live = " + live);
+ }
backupConfig = backUp;
}
+ else
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("ClientSessionFactoryImpl received backup update for live/backup pair = " + live + " / " + backUp + " but it didn't belong to " + this.connectorConfig);
+ }
+ }
}
- @Override
public Object getBackupConnector()
{
return backupConfig;
}
- @Override
public ClientSession createSession(final String username,
final String password,
final boolean xa,
@@ -263,7 +269,6 @@
ackBatchSize);
}
- @Override
public ClientSession createSession(final boolean autoCommitSends,
final boolean autoCommitAcks,
final int ackBatchSize) throws HornetQException
@@ -277,7 +282,6 @@
ackBatchSize);
}
- @Override
public ClientSession createXASession() throws HornetQException
{
return createSessionInternal(null,
@@ -289,7 +293,6 @@
serverLocator.getAckBatchSize());
}
- @Override
public ClientSession createTransactedSession() throws HornetQException
{
return createSessionInternal(null,
@@ -301,7 +304,6 @@
serverLocator.getAckBatchSize());
}
- @Override
public ClientSession createSession() throws HornetQException
{
return createSessionInternal(null,
@@ -313,7 +315,6 @@
serverLocator.getAckBatchSize());
}
- @Override
public ClientSession createSession(final boolean autoCommitSends, final boolean autoCommitAcks) throws HornetQException
{
return createSessionInternal(null,
@@ -325,7 +326,6 @@
serverLocator.getAckBatchSize());
}
- @Override
public ClientSession createSession(final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks) throws HornetQException
{
return createSessionInternal(null,
@@ -337,7 +337,6 @@
serverLocator.getAckBatchSize());
}
- @Override
public ClientSession createSession(final boolean xa,
final boolean autoCommitSends,
final boolean autoCommitAcks,
@@ -354,19 +353,16 @@
// ConnectionLifeCycleListener implementation --------------------------------------------------
- @Override
public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
}
- @Override
public void connectionDestroyed(final Object connectionID)
{
handleConnectionFailure(connectionID,
new HornetQException(HornetQException.NOT_CONNECTED, "Channel disconnected"));
}
- @Override
public void connectionException(final Object connectionID, final HornetQException me)
{
handleConnectionFailure(connectionID, me);
@@ -374,7 +370,6 @@
// Must be synchronized to prevent it happening concurrently with failover which can lead to
// inconsistencies
- @Override
public void removeSession(final ClientSessionInternal session, boolean failingOver)
{
synchronized (sessions)
@@ -383,36 +378,30 @@
}
}
- @Override
public void connectionReadyForWrites(final Object connectionID, final boolean ready)
{
}
- @Override
public synchronized int numConnections()
{
return connection != null ? 1 : 0;
}
- @Override
public int numSessions()
{
return sessions.size();
}
- @Override
public void addFailureListener(final SessionFailureListener listener)
{
listeners.add(listener);
}
- @Override
public boolean removeFailureListener(final SessionFailureListener listener)
{
return listeners.remove(listener);
}
- @Override
public void causeExit()
{
exitLoop = true;
@@ -422,7 +411,6 @@
}
}
- @Override
public void close()
{
if (closed)
@@ -461,7 +449,6 @@
closed = true;
}
- @Override
public ServerLocator getServerLocator()
{
return serverLocator;
@@ -901,6 +888,11 @@
return;
}
+ if (log.isDebugEnabled())
+ {
+ log.debug("Trying reconnection attempt " + count);
+ }
+
getConnection();
if (connection == null)
@@ -910,10 +902,10 @@
if (reconnectAttempts != 0)
{
count++;
-
+
if (reconnectAttempts != -1 && count == reconnectAttempts)
{
- log.warn("Tried " + reconnectAttempts + " times to connect. Now giving up.");
+ log.warn("Tried " + reconnectAttempts + " times to connect. Now giving up on reconnecting it.");
return;
}
@@ -994,7 +986,6 @@
}
}
- @Override
public CoreRemotingConnection getConnection()
{
if (connection == null)
@@ -1016,10 +1007,20 @@
{
connector.start();
+ if (log.isDebugEnabled())
+ {
+ log.debug("Trying to connect at the main server using connector :" + connectorConfig);
+ }
+
tc = connector.createConnection();
if (tc == null)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Main server is not up. Hopefully there's a backup configured now!");
+ }
+
try
{
connector.close();
@@ -1031,9 +1032,13 @@
connector = null;
}
}
- //if connection fails we can try the backup incase it has come live
+ //if connection fails we can try the backup in case it has come live
if(connector == null && backupConfig != null)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Trying backup config = " + backupConfig);
+ }
ConnectorFactory backupConnectorFactory = instantiateConnectorFactory(backupConfig.getFactoryClassName());
connector = backupConnectorFactory.createConnector(backupConfig.getParams(),
handler,
@@ -1049,6 +1054,11 @@
if (tc == null)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Backup is not active yet");
+ }
+
try
{
connector.close();
@@ -1062,6 +1072,12 @@
else
{
/*looks like the backup is now live, lets use that*/
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Connected to the backup at " + backupConfig);
+ }
+
connectorConfig = backupConfig;
backupConfig = null;
@@ -1155,7 +1171,6 @@
return connection;
}
- @Override
public void finalize() throws Throwable
{
if (!closed)
@@ -1175,7 +1190,6 @@
{
return AccessController.doPrivileged(new PrivilegedAction<ConnectorFactory>()
{
- @Override
public ConnectorFactory run()
{
ClassLoader loader = Thread.currentThread().getContextClassLoader();
@@ -1241,7 +1255,6 @@
this.conn = conn;
}
- @Override
public void handlePacket(final Packet packet)
{
final byte type = packet.getType();
@@ -1254,7 +1267,6 @@
{
// Must be executed on new thread since cannot block the netty thread for a long time and fail can
// cause reconnect loop
- @Override
public void run()
{
SimpleString nodeID = msg.getNodeID();
@@ -1275,10 +1287,18 @@
if (topMessage.isExit())
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Notifying " + topMessage.getNodeID() + " going down");
+ }
serverLocator.notifyNodeDown(topMessage.getNodeID());
}
else
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Node " + topMessage.getNodeID() + " going up, connector = " + topMessage.getPair() + ", isLast=" + topMessage.isLast());
+ }
serverLocator.notifyNodeUp(topMessage.getNodeID(),
topMessage.getPair(),
topMessage.isLast());
@@ -1289,7 +1309,6 @@
private class DelegatingBufferHandler implements BufferHandler
{
- @Override
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
CoreRemotingConnection theConn = connection;
@@ -1310,7 +1329,6 @@
this.connectionID = connectionID;
}
- @Override
public void connectionFailed(final HornetQException me, boolean failedOver)
{
handleConnectionFailure(connectionID, me);
@@ -1326,7 +1344,6 @@
pingRunnable = new WeakReference<PingRunnable>(runnable);
}
- @Override
public void run()
{
PingRunnable runnable = pingRunnable.get();
@@ -1347,7 +1364,6 @@
private long lastCheck = System.currentTimeMillis();
- @Override
public synchronized void run()
{
if (cancelled || stopPingingAfterOne && !first)
@@ -1371,7 +1387,6 @@
threadPool.execute(new Runnable()
{
// Must be executed on different thread
- @Override
public void run()
{
connection.fail(me);
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/Topology.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/Topology.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/Topology.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -46,7 +46,7 @@
*/
private Map<String, TopologyMember> topology = new HashMap<String, TopologyMember>();
- private boolean debug;
+ private boolean debug = log.isDebugEnabled();
public synchronized boolean addMember(String nodeId, TopologyMember member)
{
@@ -54,9 +54,9 @@
TopologyMember currentMember = topology.get(nodeId);
if (debug)
{
- System.out.println("adding = " + nodeId + ":" + member.getConnector());
- System.out.println("before----------------------------------");
- System.out.println(describe());
+ log.info("adding = " + nodeId + ":" + member.getConnector());
+ log.info("before----------------------------------");
+ log.info(describe());
}
if(currentMember == null)
{
@@ -87,9 +87,8 @@
}
if(debug)
{
-
- System.out.println("after----------------------------------updated=" + replaced);
- System.out.println(describe());
+ log.info("Topology updated=" + replaced);
+ log.info(describe());
}
return replaced;
}
@@ -97,6 +96,10 @@
public synchronized boolean removeMember(String nodeId)
{
TopologyMember member = topology.remove(nodeId);
+ if (debug)
+ {
+ log.info("Removing member " + member);
+ }
return (member != null);
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/ConfigurationImpl.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/ConfigurationImpl.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -157,7 +157,7 @@
public static final int DEFAULT_ID_CACHE_SIZE = 2000;
public static final boolean DEFAULT_PERSIST_ID_CACHE = true;
-
+
public static final boolean DEFAULT_CLUSTER_DUPLICATE_DETECTION = true;
public static final boolean DEFAULT_CLUSTER_FORWARD_WHEN_NO_CONSUMERS = false;
@@ -185,7 +185,7 @@
public static final String DEFAULT_LOG_DELEGATE_FACTORY_CLASS_NAME = JULLogDelegateFactory.class.getCanonicalName();
// Attributes -----------------------------------------------------------------------------
-
+
protected String name = "ConfigurationImpl::" + System.identityHashCode(this);
protected boolean clustered = ConfigurationImpl.DEFAULT_CLUSTERED;
@@ -231,7 +231,7 @@
protected String logDelegateFactoryClassName = ConfigurationImpl.DEFAULT_LOG_DELEGATE_FACTORY_CLASS_NAME;
protected List<String> interceptorClassNames = new ArrayList<String>();
-
+
protected Map<String, TransportConfiguration> connectorConfigs = new HashMap<String, TransportConfiguration>();
protected Set<TransportConfiguration> acceptorConfigs = new HashSet<TransportConfiguration>();
@@ -341,67 +341,56 @@
// Public -------------------------------------------------------------------------
- @Override
public boolean isClustered()
{
return clustered;
}
- @Override
public void setClustered(final boolean clustered)
{
this.clustered = clustered;
}
- @Override
public boolean isAllowAutoFailBack()
{
return allowAutoFailBack;
}
- @Override
public void setAllowAutoFailBack(boolean allowAutoFailBack)
{
this.allowAutoFailBack = allowAutoFailBack;
}
- @Override
public boolean isBackup()
{
return backup;
}
- @Override
public boolean isFileDeploymentEnabled()
{
return fileDeploymentEnabled;
}
- @Override
public void setFileDeploymentEnabled(final boolean enable)
{
fileDeploymentEnabled = enable;
}
- @Override
public boolean isPersistenceEnabled()
{
return persistenceEnabled;
}
- @Override
public void setPersistenceEnabled(final boolean enable)
{
persistenceEnabled = enable;
}
- @Override
public long getFileDeployerScanPeriod()
{
return fileDeploymentScanPeriod;
}
- @Override
public void setFileDeployerScanPeriod(final long period)
{
fileDeploymentScanPeriod = period;
@@ -410,109 +399,91 @@
/**
* @return the persistDeliveryCountBeforeDelivery
*/
- @Override
public boolean isPersistDeliveryCountBeforeDelivery()
{
return persistDeliveryCountBeforeDelivery;
}
- @Override
public void setPersistDeliveryCountBeforeDelivery(final boolean persistDeliveryCountBeforeDelivery)
{
this.persistDeliveryCountBeforeDelivery = persistDeliveryCountBeforeDelivery;
}
- @Override
public void setBackup(final boolean backup)
{
this.backup = backup;
}
- @Override
public boolean isSharedStore()
{
return sharedStore;
}
- @Override
public void setSharedStore(final boolean sharedStore)
{
this.sharedStore = sharedStore;
}
- @Override
public int getScheduledThreadPoolMaxSize()
{
return scheduledThreadPoolMaxSize;
}
- @Override
public void setScheduledThreadPoolMaxSize(final int maxSize)
{
scheduledThreadPoolMaxSize = maxSize;
}
- @Override
public int getThreadPoolMaxSize()
{
return threadPoolMaxSize;
}
- @Override
public void setThreadPoolMaxSize(final int maxSize)
{
threadPoolMaxSize = maxSize;
}
- @Override
public long getSecurityInvalidationInterval()
{
return securityInvalidationInterval;
}
- @Override
public void setSecurityInvalidationInterval(final long interval)
{
securityInvalidationInterval = interval;
}
- @Override
public long getConnectionTTLOverride()
{
return connectionTTLOverride;
}
- @Override
public void setConnectionTTLOverride(final long ttl)
{
connectionTTLOverride = ttl;
}
- @Override
public boolean isAsyncConnectionExecutionEnabled()
{
return asyncConnectionExecutionEnabled;
}
- @Override
public void setEnabledAsyncConnectionExecution(final boolean enabled)
{
asyncConnectionExecutionEnabled = enabled;
}
- @Override
public List<String> getInterceptorClassNames()
{
return interceptorClassNames;
}
- @Override
public void setInterceptorClassNames(final List<String> interceptors)
{
interceptorClassNames = interceptors;
}
- @Override
public Set<TransportConfiguration> getAcceptorConfigurations()
{
return acceptorConfigs;
@@ -528,31 +499,26 @@
return connectorConfigs;
}
- @Override
public void setConnectorConfigurations(final Map<String, TransportConfiguration> infos)
{
connectorConfigs = infos;
}
- @Override
public String getLiveConnectorName()
{
return liveConnectorName;
}
- @Override
public void setLiveConnectorName(final String liveConnectorName)
{
this.liveConnectorName = liveConnectorName;
}
-
- @Override
+
public GroupingHandlerConfiguration getGroupingHandlerConfiguration()
{
return groupingHandlerConfiguration;
}
- @Override
public void setGroupingHandlerConfiguration(final GroupingHandlerConfiguration groupingHandlerConfiguration)
{
this.groupingHandlerConfiguration = groupingHandlerConfiguration;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/FileConfiguration.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/FileConfiguration.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -38,7 +38,7 @@
private static final String DEFAULT_CONFIGURATION_URL = "hornetq-configuration.xml";
- // For a bridge confirmations must be activated or sent acknowledgments won't return
+ // For a bridge confirmations must be activated or send acknowledgments won't return
public static final int DEFAULT_CONFIRMATION_WINDOW_SIZE = 1024 * 1024;
// Static --------------------------------------------------------------------------
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/journal/SequentialFile.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/journal/SequentialFile.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -94,6 +94,8 @@
void renameTo(String newFileName) throws Exception;
SequentialFile copy();
+
+ void copyTo(SequentialFile newFileName) throws Exception;
void setTimedBuffer(TimedBuffer buffer);
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -104,6 +104,29 @@
file.delete();
}
+
+ public void copyTo(SequentialFile newFileName) throws Exception
+ {
+ log.debug("Copying " + this + " as " + newFileName);
+ newFileName.open();
+ this.open();
+
+
+ ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
+
+ for (;;)
+ {
+ buffer.rewind();
+ int size = this.read(buffer);
+ newFileName.writeInternal(buffer);
+ if (size < 10 * 1024)
+ {
+ break;
+ }
+ }
+ newFileName.close();
+ this.close();
+ }
public void position(final long pos) throws Exception
{
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/PrintPages.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/PrintPages.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/PrintPages.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -26,6 +26,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.journal.PreparedTransactionInfo;
@@ -35,11 +36,13 @@
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.PageUpdateTXEncoding;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -71,11 +74,14 @@
if (arg.length != 2)
{
System.err.println("Usage: PrintPages <page foler> <journal folder>");
+ System.exit(-1);
}
try
{
- Map<Long, Set<PagePosition>> cursorACKs = PrintPages.loadCursorACKs(arg[1]);
+ Pair<Map<Long, Set<PagePosition>>, Set<Long>> cursorACKs = PrintPages.loadCursorACKs(arg[1]);
+
+ Set<Long> pgTXs = cursorACKs.b;
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
final ExecutorService executor = Executors.newFixedThreadPool(10);
@@ -116,7 +122,7 @@
for (PagedMessage msg : msgs)
{
msg.initMessage(sm);
- System.out.print("pg=" + pg + ", msg=" + msgID + "=" + msg.getMessage());
+ System.out.print("pg=" + pgid + ", msg=" + msgID + ",pgTX=" + msg.getTransactionID() + ", msg=" + msg.getMessage());
System.out.print(",Queues = ");
long q[] = msg.getQueueIDs();
for (int i = 0; i < q.length; i++)
@@ -127,7 +133,7 @@
boolean acked = false;
- Set<PagePosition> positions = cursorACKs.get(q[i]);
+ Set<PagePosition> positions = cursorACKs.a.get(q[i]);
if (positions != null)
{
acked = positions.contains(posCheck);
@@ -143,6 +149,10 @@
System.out.print(",");
}
}
+ if (msg.getTransactionID() >= 0 && !pgTXs.contains(msg.getTransactionID()))
+ {
+ System.out.print(", **PG_TX_NOT_FOUND**");
+ }
System.out.println();
msgID++;
}
@@ -164,7 +174,7 @@
* @return
* @throws Exception
*/
- protected static Map<Long, Set<PagePosition>> loadCursorACKs(final String journalLocation) throws Exception
+ protected static Pair<Map<Long, Set<PagePosition>>, Set<Long>> loadCursorACKs(final String journalLocation) throws Exception
{
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journalLocation);
@@ -188,14 +198,17 @@
messagesJournal.load(records, txs, null);
Map<Long, Set<PagePosition>> cursorRecords = new HashMap<Long, Set<PagePosition>>();
+
+ Set<Long> pgTXs = new HashSet<Long>();
for (RecordInfo record : records)
{
+ byte[] data = record.data;
+
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+
if (record.userRecordType == JournalStorageManager.ACKNOWLEDGE_CURSOR)
{
- byte[] data = record.data;
-
- HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);
@@ -209,8 +222,28 @@
set.add(encoding.position);
}
+ else if (record.userRecordType == JournalStorageManager.PAGE_TRANSACTION)
+ {
+ if (record.isUpdate)
+ {
+ PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
+
+ pageUpdate.decode(buff);
+ pgTXs.add(pageUpdate.pageTX);
+ }
+ else
+ {
+ PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+
+ pageTransactionInfo.decode(buff);
+
+ pageTransactionInfo.setRecordID(record.id);
+ pgTXs.add(pageTransactionInfo.getTransactionID());
+ }
+ }
}
- return cursorRecords;
+
+ return new Pair<Map<Long, Set<PagePosition>>, Set<Long>>(cursorRecords, pgTXs);
}
// Package protected ---------------------------------------------
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PageSubscription.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PageSubscription.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -13,6 +13,8 @@
package org.hornetq.core.paging.cursor;
+import java.util.concurrent.Executor;
+
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.server.Queue;
@@ -131,4 +133,9 @@
* @return
*/
PagedMessage queryMessage(PagePosition pos);
+
+ /**
+ * @return
+ */
+ Executor getExecutor();
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -14,8 +14,10 @@
package org.hornetq.core.paging.cursor;
import java.lang.ref.WeakReference;
+import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.Message;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
@@ -34,6 +36,10 @@
private static final long serialVersionUID = -8640232251318264710L;
+ private static final Logger log = Logger.getLogger(PagedReferenceImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
+
private final PagePosition position;
private WeakReference<PagedMessage> message;
@@ -42,6 +48,8 @@
private int persistedCount;
+ private AtomicInteger deliveryCount = new AtomicInteger(0);
+
private final PageSubscription subscription;
public ServerMessage getMessage()
@@ -84,12 +92,12 @@
{
return true;
}
-
+
public void setPersistedCount(int count)
{
this.persistedCount = count;
}
-
+
public int getPersistedCount()
{
return persistedCount;
@@ -100,8 +108,7 @@
*/
public MessageReference copy(final Queue queue)
{
- // TODO Auto-generated method stub
- return null;
+ return new PagedReferenceImpl(this.position, this.getPagedMessage(), this.subscription);
}
/* (non-Javadoc)
@@ -137,8 +144,7 @@
*/
public int getDeliveryCount()
{
- // TODO Auto-generated method stub
- return 0;
+ return deliveryCount.get();
}
/* (non-Javadoc)
@@ -146,8 +152,7 @@
*/
public void setDeliveryCount(final int deliveryCount)
{
- // TODO Auto-generated method stub
-
+ this.deliveryCount.set(deliveryCount);
}
/* (non-Javadoc)
@@ -155,7 +160,11 @@
*/
public void incrementDeliveryCount()
{
- // TODO Auto-generated method stub
+ deliveryCount.incrementAndGet();
+ if (isTrace)
+ {
+ log.trace("deliveryCount = " + deliveryCount + " for " + this);
+ }
}
@@ -164,8 +173,7 @@
*/
public void decrementDeliveryCount()
{
- // TODO Auto-generated method stub
-
+ deliveryCount.decrementAndGet();
}
/* (non-Javadoc)
@@ -199,4 +207,25 @@
{
subscription.ackTx(tx, this);
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "PagedReferenceImpl [position=" + position +
+ ", message=" +
+ message +
+ ", deliveryTime=" +
+ deliveryTime +
+ ", persistedCount=" +
+ persistedCount +
+ ", deliveryCount=" +
+ deliveryCount +
+ ", subscription=" +
+ subscription +
+ "]";
+ }
+
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -30,7 +30,6 @@
import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Future;
import org.hornetq.utils.SoftValueHashMap;
import org.jboss.netty.util.internal.ConcurrentHashMap;
@@ -57,8 +56,7 @@
private final StorageManager storageManager;
- private final ExecutorFactory executorFactory;
-
+ // This is the same executor used at the PageStoreImpl. One Executor per pageStore
private final Executor executor;
private final SoftValueHashMap<Long, PageCache> softCache;
@@ -71,13 +69,12 @@
public PageCursorProviderImpl(final PagingStore pagingStore,
final StorageManager storageManager,
- final ExecutorFactory executorFactory,
+ final Executor executor,
final int maxCacheSize)
{
this.pagingStore = pagingStore;
this.storageManager = storageManager;
- this.executorFactory = executorFactory;
- this.executor = executorFactory.getExecutor();
+ this.executor = executor;
this.softCache = new SoftValueHashMap<Long, PageCache>(maxCacheSize);
}
@@ -96,13 +93,7 @@
throw new IllegalStateException("Cursor " + cursorID + " had already been created");
}
- activeCursor = new PageSubscriptionImpl(this,
- pagingStore,
- storageManager,
- executorFactory.getExecutor(),
- filter,
- cursorID,
- persistent);
+ activeCursor = new PageSubscriptionImpl(this, pagingStore, storageManager, executor, filter, cursorID, persistent);
activeCursors.put(cursorID, activeCursor);
return activeCursor;
}
@@ -389,6 +380,17 @@
{
pagingStore.stopPaging();
}
+ else
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace("Couldn't cleanup page on address " + this.pagingStore.getAddress() +
+ " as numberOfPages == " +
+ pagingStore.getNumberOfPages() +
+ " and currentPage.numberOfMessages = " +
+ pagingStore.getCurrentPage().getNumberOfMessages());
+ }
+ }
}
catch (Exception ex)
{
@@ -411,7 +413,7 @@
{
cache = softCache.remove((long)depagedPage.getPageId());
}
-
+
if (cache == null)
{
// The page is not on cache any more
@@ -426,7 +428,7 @@
{
pgdMessages = cache.getMessages();
}
-
+
depagedPage.delete(pgdMessages);
synchronized (softCache)
{
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.MessageReference;
@@ -40,6 +41,8 @@
// Constants -----------------------------------------------------
static final Logger log = Logger.getLogger(PageSubscriptionCounterImpl.class);
+
+ static final boolean isTrace = log.isTraceEnabled();
// Attributes ----------------------------------------------------
@@ -51,8 +54,12 @@
private long recordID = -1;
private boolean persistent;
+
+ private final PageSubscription subscription;
private final StorageManager storage;
+
+ private final Executor executor;
private final AtomicLong value = new AtomicLong(0);
@@ -60,8 +67,6 @@
private LinkedList<Pair<Long, Integer>> loadList;
- private final Executor executor;
-
private final Runnable cleanupCheck = new Runnable()
{
public void run()
@@ -77,14 +82,16 @@
// Constructors --------------------------------------------------
public PageSubscriptionCounterImpl(final StorageManager storage,
+ final PageSubscription subscription,
+ final Executor executor,
final boolean persistent,
- final long subscriptionID,
- final Executor executor)
+ final long subscriptionID)
{
this.subscriptionID = subscriptionID;
+ this.executor = executor;
this.storage = storage;
- this.executor = executor;
this.persistent = persistent;
+ this.subscription = subscription;
}
/* (non-Javadoc)
@@ -253,10 +260,13 @@
}
newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace);
+
+ if (isTrace)
+ {
+ log.trace("Replacing page-counter record = " + recordID + " by record = " + newRecordID + " on subscriptionID = " + this.subscriptionID + " for queue = " + this.subscription.getQueue().getName());
+ }
storage.commit(txCleanup);
-
- storage.waitOnOperations();
}
catch (Exception e)
{
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -92,8 +92,6 @@
private final PageCursorProvider cursorProvider;
- private final Executor executor;
-
private volatile PagePosition lastAckedPosition;
private List<PagePosition> recoveredACK;
@@ -101,6 +99,8 @@
private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
private final PageSubscriptionCounter counter;
+
+ private final Executor executor;
private final AtomicLong deliveredCount = new AtomicLong(0);
@@ -126,7 +126,7 @@
this.executor = executor;
this.filter = filter;
this.persistent = persistent;
- this.counter = new PageSubscriptionCounterImpl(store, persistent, cursorId, executor);
+ this.counter = new PageSubscriptionCounterImpl(store, this, executor, persistent, cursorId);
}
// Public --------------------------------------------------------
@@ -224,7 +224,7 @@
// First get the completed pages using a lock
synchronized (this)
{
- for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
+ for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
{
PageCursorInfo info = entry.getValue();
if (info.isDone() && !info.isPendingDelete() && lastAckedPosition != null)
@@ -687,6 +687,14 @@
}
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageSubscription#executeWithContext(java.lang.Runnable)
+ */
+ public Executor getExecutor()
+ {
+ return executor;
+ }
+
private synchronized PageCursorInfo getPageInfo(final PagePosition pos)
{
return getPageInfo(pos, true);
@@ -734,8 +742,17 @@
{
if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0)
{
+ if (isTrace)
+ {
+ log.trace("a new position is being processed as ACK");
+ }
if (lastAckedPosition != null && lastAckedPosition.getPageNr() != pos.getPageNr())
{
+ if (isTrace)
+ {
+ log.trace("Scheduling cleanup on pageSubscription for address = " + pageStore.getAddress() + " queue = " + this.getQueue().getName());
+ }
+
// there's a different page being acked, we will do the check right away
if (autoCleanup)
{
@@ -780,7 +797,7 @@
private PageTransactionInfo getPageTransaction(final PagedReference reference)
{
- if (reference.getPagedMessage().getTransactionID() != 0)
+ if (reference.getPagedMessage().getTransactionID() >= 0)
{
return pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID());
}
@@ -816,6 +833,7 @@
private final long pageId;
+ // TODO: Merge removedReferences and acks into a single structure
// Confirmed ACKs on this page
private final Set<PagePosition> acks = Collections.synchronizedSet(new LinkedHashSet<PagePosition>());
@@ -1135,6 +1153,13 @@
{
ignored = true;
}
+
+ PageCursorInfo info = getPageInfo(message.getPosition(), false);
+
+ if (info != null && info.isRemoved(message.getPosition()))
+ {
+ continue;
+ }
// 2nd ... if TX, is it committed?
if (valid && message.getPagedMessage().getTransactionID() >= 0)
@@ -1145,7 +1170,7 @@
{
log.warn("Couldn't locate page transaction " + message.getPagedMessage().getTransactionID() +
", ignoring message on position " +
- message.getPosition());
+ message.getPosition() + " on address=" + pageStore.getAddress() + " queue=" + queue.getName());
valid = false;
ignored = true;
}
@@ -1166,7 +1191,7 @@
// Say you have a Browser that will only read the files... there's no need to control PageCursors is
// nothing
// is being changed. That's why the false is passed as a parameter here
- PageCursorInfo info = getPageInfo(message.getPosition(), false);
+
if (info != null && info.isRemoved(message.getPosition()))
{
valid = false;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageSyncTimer.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageSyncTimer.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageSyncTimer.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -19,6 +19,7 @@
import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.OperationContext;
@@ -34,6 +35,9 @@
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PageSyncTimer.class);
+
+
// Attributes ----------------------------------------------------
private final PagingStore store;
@@ -83,6 +87,7 @@
OperationContext [] pendingSyncsArray;
synchronized (this)
{
+
pendingSync = false;
pendingSyncsArray = new OperationContext[syncOperations.size()];
pendingSyncsArray = syncOperations.toArray(pendingSyncsArray);
@@ -91,7 +96,10 @@
try
{
- store.ioSync();
+ if (pendingSyncsArray.length != 0)
+ {
+ store.ioSync();
+ }
}
catch (Exception e)
{
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -84,6 +84,7 @@
HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(largeMessageLazyData);
message.decodeHeadersAndProperties(buffer);
lgMessage.incrementDelayDeletionCount();
+ lgMessage.setPaged();
largeMessageLazyData = null;
}
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -106,12 +106,11 @@
syncTimeout,
pagingManager,
storageManager,
- postOffice,
null,
this,
address,
settings,
- executorFactory,
+ executorFactory.getExecutor(),
syncNonTransactional);
}
@@ -212,12 +211,11 @@
syncTimeout,
pagingManager,
storageManager,
- postOffice,
factory,
this,
address,
settings,
- executorFactory,
+ executorFactory.getExecutor(),
syncNonTransactional);
storesReturn.add(store);
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -46,6 +46,7 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.RouteContextList;
import org.hornetq.core.server.RoutingContext;
@@ -80,8 +81,6 @@
private final StorageManager storageManager;
- private final PostOffice postOffice;
-
private final DecimalFormat format = new DecimalFormat("000000000");
private final AtomicInteger currentPageSize = new AtomicInteger(0);
@@ -148,12 +147,11 @@
final long syncTimeout,
final PagingManager pagingManager,
final StorageManager storageManager,
- final PostOffice postOffice,
final SequentialFileFactory fileFactory,
final PagingStoreFactory storeFactory,
final SimpleString storeName,
final AddressSettings addressSettings,
- final ExecutorFactory executorFactory,
+ final Executor executor,
final boolean syncNonTransactional)
{
if (pagingManager == null)
@@ -165,8 +163,6 @@
this.storageManager = storageManager;
- this.postOffice = postOffice;
-
this.storeName = storeName;
applySetting(addressSettings);
@@ -181,7 +177,7 @@
pageSize);
}
- this.executor = executorFactory.getExecutor();
+ this.executor = executor;
this.pagingManager = pagingManager;
@@ -200,7 +196,7 @@
this.syncTimer = null;
}
- this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executorFactory, addressSettings.getPageCacheMaxSize());
+ this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executor, addressSettings.getPageCacheMaxSize());
}
@@ -870,6 +866,11 @@
PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), tx == null ? -1 : tx.getID());
+
+ if (message.isLargeMessage())
+ {
+ ((LargeServerMessage)message).setPaged();
+ }
int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
@@ -879,20 +880,19 @@
openNewPage();
currentPageSize.addAndGet(bytesToWrite);
}
-
- installPageTransaction(tx, listCtx, currentPage.getPageId());
currentPage.write(pagedMessage);
-
- if (sync || tx != null)
- {
- sync();
- }
if (tx != null)
{
+ installPageTransaction(tx, listCtx);
tx.setWaitBeforeCommit(true);
}
+ else
+ if (sync && tx == null)
+ {
+ sync();
+ }
return true;
}
@@ -924,38 +924,46 @@
return ids;
}
- private PageTransactionInfo installPageTransaction(final Transaction tx, final RouteContextList listCtx, int pageID) throws Exception
+ private void installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception
{
- if (tx == null)
+ FinishPageMessageOperation pgOper = (FinishPageMessageOperation)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+ if (pgOper == null)
{
- return null;
+ PageTransactionInfo pgTX = new PageTransactionInfoImpl(tx.getID());
+ pagingManager.addTransaction(pgTX);
+ pgOper = new FinishPageMessageOperation(pgTX, storageManager, pagingManager);
+ tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgOper);
+ tx.addOperation(pgOper);
}
- else
- {
- PageTransactionInfo pgTX = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
- if (pgTX == null)
- {
- pgTX = new PageTransactionInfoImpl(tx.getID());
- pagingManager.addTransaction(pgTX);
- tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgTX);
- tx.addOperation(new FinishPageMessageOperation(pgTX));
- }
- pgTX.increment(listCtx.getNumberOfQueues());
+ pgOper.addStore(this);
+ pgOper.pageTransaction.increment(listCtx.getNumberOfQueues());
- return pgTX;
- }
+ return;
}
- private class FinishPageMessageOperation implements TransactionOperation
+ private static class FinishPageMessageOperation implements TransactionOperation
{
- private final PageTransactionInfo pageTransaction;
+ public final PageTransactionInfo pageTransaction;
+
+ private final StorageManager storageManager;
+
+ private final PagingManager pagingManager;
+
+ private final Set<PagingStore> usedStores = new HashSet<PagingStore>();
private boolean stored = false;
+
+ public void addStore(PagingStore store)
+ {
+ this.usedStores.add(store);
+ }
- public FinishPageMessageOperation(final PageTransactionInfo pageTransaction)
+ public FinishPageMessageOperation(final PageTransactionInfo pageTransaction, final StorageManager storageManager, final PagingManager pagingManager)
{
this.pageTransaction = pageTransaction;
+ this.storageManager = storageManager;
+ this.pagingManager = pagingManager;
}
public void afterCommit(final Transaction tx)
@@ -984,11 +992,24 @@
public void beforeCommit(final Transaction tx) throws Exception
{
+ syncStore();
storePageTX(tx);
}
+ /**
+ * @throws Exception
+ */
+ private void syncStore() throws Exception
+ {
+ for (PagingStore store : usedStores)
+ {
+ store.sync();
+ }
+ }
+
public void beforePrepare(final Transaction tx) throws Exception
{
+ syncStore();
storePageTX(tx);
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -2402,7 +2402,7 @@
}
}
- private static class PageUpdateTXEncoding implements EncodingSupport
+ public static class PageUpdateTXEncoding implements EncodingSupport
{
public long pageTX;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -18,6 +18,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.BodyEncoder;
@@ -48,6 +49,8 @@
private final JournalStorageManager storageManager;
private LargeServerMessage linkMessage;
+
+ private boolean paged;
// We should only use the NIO implementation on the Journal
private SequentialFile file;
@@ -82,6 +85,11 @@
// Public --------------------------------------------------------
+ public void setPaged()
+ {
+ paged = true;
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.server.LargeServerMessage#addBytes(byte[])
*/
@@ -260,27 +268,69 @@
}
}
}
+
+ public void setOriginalHeaders(final ServerMessage other, final boolean expiry)
+ {
+ super.setOriginalHeaders(other, expiry);
+
+ LargeServerMessageImpl otherLM = (LargeServerMessageImpl)other;
+ this.paged = otherLM.paged;
+ if (this.paged)
+ {
+ this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
+ }
+ }
+
+
@Override
public synchronized ServerMessage copy(final long newID)
{
- incrementDelayDeletionCount();
-
- long idToUse = messageID;
-
- if (linkMessage != null)
+ if (!paged)
{
- idToUse = linkMessage.getMessageID();
+ incrementDelayDeletionCount();
+
+ long idToUse = messageID;
+
+ if (linkMessage != null)
+ {
+ idToUse = linkMessage.getMessageID();
+ }
+
+ SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
+
+ ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
+ : (LargeServerMessageImpl)linkMessage,
+ newfile,
+ newID);
+ return newMessage;
}
-
- SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
-
- ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
- : (LargeServerMessageImpl)linkMessage,
- newfile,
- newID);
-
- return newMessage;
+ else
+ {
+ try
+ {
+ validateFile();
+
+ SequentialFile file = this.file;
+
+ SequentialFile newFile = storageManager.createFileForLargeMessage(newID, durable);
+
+ file.copyTo(newFile);
+
+ LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile, newID);
+
+ newMessage.linkMessage = null;
+
+ newMessage.setPaged();
+
+ return newMessage;
+ }
+ catch (Exception e)
+ {
+ log.warn("Error on copying large message this for DLA or Expiry", e);
+ return null;
+ }
+ }
}
public SequentialFile getFile()
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -157,7 +157,14 @@
return "LargeServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]";
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.LargeServerMessage#setPaged()
+ */
+ public void setPaged()
+ {
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -267,7 +267,6 @@
configuration);
}
- @Override
public synchronized void start() throws Exception
{
if (channelFactory != null)
@@ -330,7 +329,6 @@
ChannelPipelineFactory factory = new ChannelPipelineFactory()
{
- @Override
public ChannelPipeline getPipeline() throws Exception
{
Map<String, ChannelHandler> handlers = new LinkedHashMap<String, ChannelHandler>();
@@ -349,7 +347,7 @@
if (httpEnabled)
{
handlers.put("http-decoder", new HttpRequestDecoder());
-
+
handlers.put("http-aggregator", new HttpChunkAggregator(Integer.MAX_VALUE));
handlers.put("http-encoder", new HttpResponseEncoder());
@@ -486,7 +484,6 @@
}
}
- @Override
public synchronized void stop()
{
if (channelFactory == null)
@@ -562,13 +559,11 @@
paused = false;
}
- @Override
public boolean isStarted()
{
return channelFactory != null;
}
- @Override
public void pause()
{
if (paused)
@@ -611,7 +606,6 @@
paused = true;
}
- @Override
public void setNotificationService(final NotificationService notificationService)
{
this.notificationService = notificationService;
@@ -638,7 +632,6 @@
{
sslHandler.handshake().addListener(new ChannelFutureListener()
{
- @Override
public void operationComplete(final ChannelFuture future) throws Exception
{
if (future.isSuccess())
@@ -661,7 +654,6 @@
private class Listener implements ConnectionLifeCycleListener
{
- @Override
public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
if (connections.putIfAbsent(connection.getID(), (NettyConnection)connection) != null)
@@ -672,7 +664,6 @@
listener.connectionCreated(connection, NettyAcceptor.this.protocol);
}
- @Override
public void connectionDestroyed(final Object connectionID)
{
if (connections.remove(connectionID) != null)
@@ -681,7 +672,6 @@
}
}
- @Override
public void connectionException(final Object connectionID, final HornetQException me)
{
// Execute on different thread to avoid deadlocks
@@ -696,23 +686,21 @@
}
- @Override
public void connectionReadyForWrites(final Object connectionID, boolean ready)
{
NettyConnection conn = connections.get(connectionID);
-
+
if (conn != null)
{
conn.fireReady(ready);
- }
- }
+ }
+ }
}
private class BatchFlusher implements Runnable
{
private boolean cancelled;
- @Override
public synchronized void run()
{
if (!cancelled)
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -30,7 +30,13 @@
void setLinkedMessage(LargeServerMessage message);
boolean isFileExists() throws Exception;
-
+
+ /**
+ * We have to copy the large message content in case of DLQ and paged messages
+ * For that we need to pre-mark the LargeMessage with a flag when it is paged
+ */
+ void setPaged();
+
/** Close the files if opened */
void releaseResources();
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -72,6 +72,8 @@
public class QueueImpl implements Queue
{
private static final Logger log = Logger.getLogger(QueueImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
public static final int REDISTRIBUTOR_BATCH_SIZE = 100;
@@ -119,6 +121,8 @@
private final Runnable deliverRunner = new DeliverRunner();
+ private volatile boolean depagePending = false;
+
private final Runnable depageRunner = new DepageRunner();
private final StorageManager storageManager;
@@ -396,7 +400,7 @@
public void deliverAsync()
{
- executor.execute(deliverRunner);
+ getExecutor().execute(deliverRunner);
}
public void close() throws Exception
@@ -411,7 +415,15 @@
public Executor getExecutor()
{
- return executor;
+ if (pageSubscription != null && pageSubscription.isPaging())
+ {
+ // When in page mode, we don't want to have concurrent IO on the same PageStore
+ return pageSubscription.getExecutor();
+ }
+ else
+ {
+ return executor;
+ }
}
/* Only used on tests */
@@ -432,7 +444,7 @@
if (!ok)
{
- log.warn("Couldn't finish waiting executors. Try increasing the thread pool size");
+ log.warn("Couldn't finish waiting executors. Try increasing the thread pool size", new Exception ("trace"));
}
return ok;
@@ -827,10 +839,18 @@
{
if (expiryAddress != null)
{
+ if (isTrace)
+ {
+ log.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName(), new Exception ("trace"));
+ }
move(expiryAddress, ref, true, false);
}
else
{
+ if (isTrace)
+ {
+ log.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName());
+ }
acknowledge(ref);
}
}
@@ -1440,6 +1460,8 @@
int numRefs = messageReferences.size();
int handled = 0;
+
+ long timeout = System.currentTimeMillis() + 1000;
while (handled < numRefs)
{
@@ -1451,6 +1473,19 @@
return;
}
+
+ if (pageSubscription != null && pageSubscription.isPaging() && System.currentTimeMillis() > timeout)
+ {
+ if (isTrace)
+ {
+ log.trace("Page delivery has been running for too long. Scheduling another delivery task now");
+ }
+
+ deliverAsync();
+
+ return;
+ }
+
ConsumerHolder holder = consumerList.get(pos);
@@ -1549,7 +1584,7 @@
}
}
- if (pageIterator != null && messageReferences.size() == 0 && pageIterator.hasNext())
+ if (pageIterator != null && messageReferences.size() == 0 && pageSubscription.isPaging() && pageIterator.hasNext())
{
scheduleDepage();
}
@@ -1580,12 +1615,20 @@
private void scheduleDepage()
{
- executor.execute(depageRunner);
+ if (!depagePending)
+ {
+ if (isTrace)
+ {
+ log.trace("Scheduling depage for queue " + this.getName());
+ }
+ depagePending = true;
+ pageSubscription.getExecutor().execute(depageRunner);
+ }
}
private void depage()
{
- if (paused || pageIterator == null || consumerList.isEmpty())
+ if (paused || pageIterator == null)
{
return;
}
@@ -1593,13 +1636,15 @@
long maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
// System.out.println("QueueMemorySize before depage = " + queueMemorySize.get());
+ int depaged = 0;
while (queueMemorySize.get() < maxSize && pageIterator.hasNext())
{
+ depaged++;
PagedReference reference = pageIterator.next();
addTail(reference, false);
pageIterator.remove();
}
- // System.out.println("QueueMemorySize after depage = " + queueMemorySize.get() + " depaged " + nmessages);
+ log.debug("Queue Memory Size after depage on queue="+this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages");
deliverAsync();
}
@@ -1629,11 +1674,13 @@
if (internalQueue)
{
+ if (isTrace)
+ {
+ log.trace("Queue " + this.getName() + " is an internal queue, no checkRedelivery");
+ }
// no DLQ check on internal queues
return true;
}
-
- // TODO: DeliveryCount on paging
if (!internalQueue && message.isDurable() && durable && !reference.isPaged())
{
@@ -1647,7 +1694,11 @@
// First check DLA
if (maxDeliveries > 0 && reference.getDeliveryCount() >= maxDeliveries)
{
- sendToDeadLetterAddress(reference);
+ if (isTrace)
+ {
+ log.trace("Sending reference " + reference + " to DLA = " + addressSettings.getDeadLetterAddress() + " since ref.getDeliveryCount=" + reference.getDeliveryCount() + "and maxDeliveries=" + maxDeliveries + " from queue=" + this.getName());
+ }
+ sendToDeadLetterAddress(reference, addressSettings.getDeadLetterAddress());
return false;
}
@@ -1658,6 +1709,10 @@
if (redeliveryDelay > 0)
{
+ if (isTrace)
+ {
+ log.trace("Setting redeliveryDelay=" + redeliveryDelay + " on reference=" + reference);
+ }
reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);
if (message.isDurable() && durable)
@@ -1738,10 +1793,14 @@
}
}
+
private void sendToDeadLetterAddress(final MessageReference ref) throws Exception
{
- SimpleString deadLetterAddress = addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress();
-
+ sendToDeadLetterAddress(ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());
+ }
+
+ private void sendToDeadLetterAddress(final MessageReference ref, final SimpleString deadLetterAddress) throws Exception
+ {
if (deadLetterAddress != null)
{
Bindings bindingList = postOffice.getBindingsForAddress(deadLetterAddress);
@@ -1859,6 +1918,10 @@
{
if (reference.getMessage().isExpired())
{
+ if (isTrace)
+ {
+ log.trace("Reference " + reference + " is expired");
+ }
reference.handled();
try
@@ -1941,7 +2004,7 @@
// ack isn't committed, then the server crashes and on
// recovery the message is deleted even though the other ack never committed
- // also note then when this happens as part of a transaction it is the tx commit of the ack that is
+ // also note then when this happens as part of a transaction it is the tx commit of the ack that is
// important not this
// Also note that this delete shouldn't sync to disk, or else we would build up the executor's queue
@@ -2152,6 +2215,7 @@
{
try
{
+ depagePending = false;
depage();
}
catch (Exception e)
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -16,7 +16,6 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -84,8 +83,6 @@
private final ServerSession session;
- private final Executor executor;
-
private final Object lock = new Object();
private volatile AtomicInteger availableCredits = new AtomicInteger(0);
@@ -153,8 +150,6 @@
messageQueue = binding.getQueue();
- this.executor = messageQueue.getExecutor();
-
this.started = browseOnly || started;
this.browseOnly = browseOnly;
@@ -376,7 +371,7 @@
Future future = new Future();
- executor.execute(future);
+ messageQueue.getExecutor().execute(future);
boolean ok = future.await(10000);
@@ -483,7 +478,7 @@
Future future = new Future();
- executor.execute(future);
+ messageQueue.getExecutor().execute(future);
boolean ok = future.await(10000);
@@ -668,7 +663,7 @@
{
if (browseOnly)
{
- executor.execute(browserDeliverer);
+ messageQueue.getExecutor().execute(browserDeliverer);
}
else
{
@@ -680,7 +675,7 @@
private void resumeLargeMessage()
{
- executor.execute(resumeLargeMessageRunnable);
+ messageQueue.getExecutor().execute(resumeLargeMessageRunnable);
}
private void deliverLargeMessage(final MessageReference ref, final ServerMessage message) throws Exception
@@ -723,7 +718,7 @@
{
if (browseOnly)
{
- executor.execute(browserDeliverer);
+ messageQueue.getExecutor().execute(browserDeliverer);
}
else
{
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/version/impl/VersionImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/version/impl/VersionImpl.java 2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/version/impl/VersionImpl.java 2011-05-17 04:51:31 UTC (rev 10676)
@@ -46,7 +46,7 @@
private final int incrementingVersion;
private final String versionSuffix;
-
+
private final String nettyVersion;
private final int[] compatibleVersionList;
@@ -75,13 +75,12 @@
this.versionSuffix = versionSuffix;
this.nettyVersion = nettyVersion;
-
+
this.compatibleVersionList = compatibleVersionList;
}
// Version implementation ------------------------------------------
- @Override
public String getFullVersion()
{
return majorVersion + "." +
@@ -97,49 +96,41 @@
")";
}
- @Override
public String getVersionName()
{
return versionName;
}
- @Override
public int getMajorVersion()
{
return majorVersion;
}
- @Override
public int getMinorVersion()
{
return minorVersion;
}
- @Override
public int getMicroVersion()
{
return microVersion;
}
- @Override
public String getVersionSuffix()
{
return versionSuffix;
}
- @Override
public int getIncrementingVersion()
{
return incrementingVersion;
}
- @Override
public String getNettyVersion()
{
return nettyVersion;
}
- @Override
public int[] getCompatibleVersionList()
{
return compatibleVersionList;
@@ -154,7 +145,7 @@
{
return true;
}
- if (!(other instanceof Version))
+ if (other instanceof Version == false)
{
return false;
}
More information about the hornetq-commits
mailing list