[hornetq-commits] JBoss hornetq SVN: r10676 - in trunk/hornetq-core/src/main/java/org/hornetq/core: config/impl and 12 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue May 17 00:51:32 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-05-17 00:51:31 -0400 (Tue, 17 May 2011)
New Revision: 10676

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/Topology.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/ConfigurationImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/FileConfiguration.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/journal/SequentialFile.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/paging/PrintPages.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PageSubscription.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageSyncTimer.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/version/impl/VersionImpl.java
Log:
syncing branches EAP and trunk with changes on core

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -47,7 +47,6 @@
    /**
     * @return the largeMessageSize
     */
-   @Override
    public long getLargeMessageSize()
    {
       return largeMessageSize;
@@ -56,7 +55,6 @@
    /**
     * @param largeMessageSize the largeMessageSize to set
     */
-   @Override
    public void setLargeMessageSize(long largeMessageSize)
    {
       this.largeMessageSize = largeMessageSize;
@@ -92,7 +90,6 @@
       return true;
    }
 
-   @Override
    public void setLargeMessageController(final LargeMessageController controller)
    {
       largeMessageController = controller;
@@ -112,7 +109,6 @@
       return getLongProperty(Message.HDR_LARGE_BODY_SIZE).intValue();
    }
 
-   @Override
    public LargeMessageController getLargeMessageController()
    {
       return largeMessageController;

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -205,7 +205,6 @@
  
    }
 
-   @Override
    public void connect(int initialConnectAttempts, boolean failoverOnInitialConnection) throws HornetQException
    {
       // Get the connection
@@ -224,28 +223,35 @@
 
    }
 
-   @Override
    public TransportConfiguration getConnectorConfiguration()
    {
       return connectorConfig;
    }
 
-   @Override
    public void setBackupConnector(TransportConfiguration live, TransportConfiguration backUp)
    {
       if(live.equals(connectorConfig) && backUp != null)
       {
+         if (log.isDebugEnabled())
+         {
+              log.debug("Setting up backup config = " + backUp + " for live = " + live);
+         }
          backupConfig = backUp;
       }
+      else
+      {
+         if (log.isDebugEnabled())
+         {
+            log.debug("ClientSessionFactoryImpl received backup update for live/backup pair = " + live + " / " + backUp + " but it didn't belong to " + this.connectorConfig);
+         }
+      }
    }
 
-   @Override
    public Object getBackupConnector()
    {
       return backupConfig;
    }
 
-   @Override
    public ClientSession createSession(final String username,
                                       final String password,
                                       final boolean xa,
@@ -263,7 +269,6 @@
                                    ackBatchSize);
    }
 
-   @Override
    public ClientSession createSession(final boolean autoCommitSends,
                                       final boolean autoCommitAcks,
                                       final int ackBatchSize) throws HornetQException
@@ -277,7 +282,6 @@
                                    ackBatchSize);
    }
 
-   @Override
    public ClientSession createXASession() throws HornetQException
    {
       return createSessionInternal(null,
@@ -289,7 +293,6 @@
                                    serverLocator.getAckBatchSize());
    }
 
-   @Override
    public ClientSession createTransactedSession() throws HornetQException
    {
       return createSessionInternal(null,
@@ -301,7 +304,6 @@
                                    serverLocator.getAckBatchSize());
    }
 
-   @Override
    public ClientSession createSession() throws HornetQException
    {
       return createSessionInternal(null,
@@ -313,7 +315,6 @@
                                    serverLocator.getAckBatchSize());
    }
 
-   @Override
    public ClientSession createSession(final boolean autoCommitSends, final boolean autoCommitAcks) throws HornetQException
    {
       return createSessionInternal(null,
@@ -325,7 +326,6 @@
                                    serverLocator.getAckBatchSize());
    }
 
-   @Override
    public ClientSession createSession(final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks) throws HornetQException
    {
       return createSessionInternal(null,
@@ -337,7 +337,6 @@
                                    serverLocator.getAckBatchSize());
    }
 
-   @Override
    public ClientSession createSession(final boolean xa,
                                       final boolean autoCommitSends,
                                       final boolean autoCommitAcks,
@@ -354,19 +353,16 @@
 
    // ConnectionLifeCycleListener implementation --------------------------------------------------
 
-   @Override
    public void connectionCreated(final Connection connection, final ProtocolType protocol)
    {
    }
 
-   @Override
    public void connectionDestroyed(final Object connectionID)
    {
       handleConnectionFailure(connectionID,
                               new HornetQException(HornetQException.NOT_CONNECTED, "Channel disconnected"));
    }
 
-   @Override
    public void connectionException(final Object connectionID, final HornetQException me)
    {
       handleConnectionFailure(connectionID, me);
@@ -374,7 +370,6 @@
 
    // Must be synchronized to prevent it happening concurrently with failover which can lead to
    // inconsistencies
-   @Override
    public void removeSession(final ClientSessionInternal session, boolean failingOver)
    {
       synchronized (sessions)
@@ -383,36 +378,30 @@
       }
    }
    
-   @Override
    public void connectionReadyForWrites(final Object connectionID, final boolean ready)
    {
    }
 
-   @Override
    public synchronized int numConnections()
    {
       return connection != null ? 1 : 0;
    }
 
-   @Override
    public int numSessions()
    {
       return sessions.size();
    }
 
-   @Override
    public void addFailureListener(final SessionFailureListener listener)
    {
       listeners.add(listener);
    }
 
-   @Override
    public boolean removeFailureListener(final SessionFailureListener listener)
    {
       return listeners.remove(listener);
    }
 
-   @Override
    public void causeExit()
    {
       exitLoop = true;
@@ -422,7 +411,6 @@
       }
    }
 
-   @Override
    public void close()
    {
       if (closed)
@@ -461,7 +449,6 @@
       closed = true;
    }
 
-   @Override
    public ServerLocator getServerLocator()
    {
       return serverLocator;
@@ -901,6 +888,11 @@
                return;
             }
 
+            if (log.isDebugEnabled())
+            {
+               log.debug("Trying reconnection attempt " + count);
+            }
+
             getConnection();
 
             if (connection == null)
@@ -910,10 +902,10 @@
                if (reconnectAttempts != 0)
                {
                   count++;
-
+                  
                   if (reconnectAttempts != -1 && count == reconnectAttempts)
                   {
-                     log.warn("Tried " + reconnectAttempts + " times to connect. Now giving up.");
+                     log.warn("Tried " + reconnectAttempts + " times to connect. Now giving up on reconnecting it.");
 
                      return;
                   }
@@ -994,7 +986,6 @@
       }
    }
 
-   @Override
    public CoreRemotingConnection getConnection()
    {
       if (connection == null)
@@ -1016,10 +1007,20 @@
             {
                connector.start();
 
+               if (log.isDebugEnabled())
+               {
+                  log.debug("Trying to connect at the main server using connector :" + connectorConfig);
+               }
+               
                tc = connector.createConnection();
 
                if (tc == null)
                {
+                  if (log.isDebugEnabled())
+                  {
+                     log.debug("Main server is not up. Hopefully there's a backup configured now!");
+                  }
+                  
                   try
                   {
                      connector.close();
@@ -1031,9 +1032,13 @@
                   connector = null;
                }
             }
-            //if connection fails we can try the backup incase it has come live
+            //if connection fails we can try the backup in case it has come live
             if(connector == null && backupConfig != null)
             {
+               if (log.isDebugEnabled())
+               {
+                  log.debug("Trying backup config = " + backupConfig);
+               }
                ConnectorFactory backupConnectorFactory = instantiateConnectorFactory(backupConfig.getFactoryClassName());
                connector = backupConnectorFactory.createConnector(backupConfig.getParams(),
                                                          handler,
@@ -1049,6 +1054,11 @@
 
                   if (tc == null)
                   {
+                     if (log.isDebugEnabled())
+                     {
+                        log.debug("Backup is not active yet");
+                     }
+                     
                      try
                      {
                         connector.close();
@@ -1062,6 +1072,12 @@
                   else
                   {
                      /*looks like the backup is now live, lets use that*/
+                     
+                     if (log.isDebugEnabled())
+                     {
+                        log.debug("Connected to the backup at " + backupConfig);
+                     }
+                     
                      connectorConfig = backupConfig;
 
                      backupConfig = null;
@@ -1155,7 +1171,6 @@
       return connection;
    }
 
-   @Override
    public void finalize() throws Throwable
    {
       if (!closed)
@@ -1175,7 +1190,6 @@
    {
       return AccessController.doPrivileged(new PrivilegedAction<ConnectorFactory>()
       {
-         @Override
          public ConnectorFactory run()
          {
             ClassLoader loader = Thread.currentThread().getContextClassLoader();
@@ -1241,7 +1255,6 @@
          this.conn = conn;
       }
 
-      @Override
       public void handlePacket(final Packet packet)
       {
          final byte type = packet.getType();
@@ -1254,7 +1267,6 @@
             {
                // Must be executed on new thread since cannot block the netty thread for a long time and fail can
                // cause reconnect loop
-               @Override
                public void run()
                {
                   SimpleString nodeID = msg.getNodeID();
@@ -1275,10 +1287,18 @@
 
             if (topMessage.isExit())
             {
+               if (log.isDebugEnabled())
+               {
+                  log.debug("Notifying " + topMessage.getNodeID() + " going down");
+               }
                serverLocator.notifyNodeDown(topMessage.getNodeID());
             }
             else
             {
+               if (log.isDebugEnabled())
+               {
+                  log.debug("Node " + topMessage.getNodeID() + " going up, connector = " + topMessage.getPair() + ", isLast=" + topMessage.isLast());
+               }
                serverLocator.notifyNodeUp(topMessage.getNodeID(),
                                           topMessage.getPair(),
                                           topMessage.isLast());
@@ -1289,7 +1309,6 @@
 
    private class DelegatingBufferHandler implements BufferHandler
    {
-      @Override
       public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
       {
          CoreRemotingConnection theConn = connection;
@@ -1310,7 +1329,6 @@
          this.connectionID = connectionID;
       }
 
-      @Override
       public void connectionFailed(final HornetQException me, boolean failedOver)
       {
          handleConnectionFailure(connectionID, me);
@@ -1326,7 +1344,6 @@
          pingRunnable = new WeakReference<PingRunnable>(runnable);
       }
 
-      @Override
       public void run()
       {
          PingRunnable runnable = pingRunnable.get();
@@ -1347,7 +1364,6 @@
 
       private long lastCheck = System.currentTimeMillis();
 
-      @Override
       public synchronized void run()
       {
          if (cancelled || stopPingingAfterOne && !first)
@@ -1371,7 +1387,6 @@
                threadPool.execute(new Runnable()
                {
                   // Must be executed on different thread
-                  @Override
                   public void run()
                   {
                      connection.fail(me);

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/Topology.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/Topology.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/Topology.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -46,7 +46,7 @@
     */
    private Map<String, TopologyMember> topology = new HashMap<String, TopologyMember>();
 
-   private boolean debug;
+   private boolean debug = log.isDebugEnabled();
 
    public synchronized boolean addMember(String nodeId, TopologyMember member)
    {
@@ -54,9 +54,9 @@
       TopologyMember currentMember = topology.get(nodeId);
       if (debug)
       {
-         System.out.println("adding = " + nodeId + ":" + member.getConnector());
-         System.out.println("before----------------------------------");
-         System.out.println(describe());
+         log.info("adding = " + nodeId + ":" + member.getConnector());
+         log.info("before----------------------------------");
+         log.info(describe());
       }
       if(currentMember == null)
       {
@@ -87,9 +87,8 @@
       }
       if(debug)
       {
-
-         System.out.println("after----------------------------------updated=" + replaced);
-         System.out.println(describe());
+         log.info("Topology updated=" + replaced);
+         log.info(describe());
       }
       return replaced;
    }
@@ -97,6 +96,10 @@
    public synchronized boolean removeMember(String nodeId)
    {
       TopologyMember member = topology.remove(nodeId);
+      if (debug)
+      {
+         log.info("Removing member " + member);
+      }
       return (member != null);
    }
 

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/ConfigurationImpl.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/ConfigurationImpl.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -157,7 +157,7 @@
    public static final int DEFAULT_ID_CACHE_SIZE = 2000;
 
    public static final boolean DEFAULT_PERSIST_ID_CACHE = true;
-
+   
    public static final boolean DEFAULT_CLUSTER_DUPLICATE_DETECTION = true;
 
    public static final boolean DEFAULT_CLUSTER_FORWARD_WHEN_NO_CONSUMERS = false;
@@ -185,7 +185,7 @@
    public static final String DEFAULT_LOG_DELEGATE_FACTORY_CLASS_NAME = JULLogDelegateFactory.class.getCanonicalName();
 
    // Attributes -----------------------------------------------------------------------------
-
+   
    protected String name = "ConfigurationImpl::" + System.identityHashCode(this);
 
    protected boolean clustered = ConfigurationImpl.DEFAULT_CLUSTERED;
@@ -231,7 +231,7 @@
    protected String logDelegateFactoryClassName = ConfigurationImpl.DEFAULT_LOG_DELEGATE_FACTORY_CLASS_NAME;
 
    protected List<String> interceptorClassNames = new ArrayList<String>();
-
+   
    protected Map<String, TransportConfiguration> connectorConfigs = new HashMap<String, TransportConfiguration>();
 
    protected Set<TransportConfiguration> acceptorConfigs = new HashSet<TransportConfiguration>();
@@ -341,67 +341,56 @@
 
    // Public -------------------------------------------------------------------------
 
-   @Override
    public boolean isClustered()
    {
       return clustered;
    }
 
-   @Override
    public void setClustered(final boolean clustered)
    {
       this.clustered = clustered;
    }
 
-   @Override
    public boolean isAllowAutoFailBack()
    {
       return allowAutoFailBack;
    }
 
-   @Override
    public void setAllowAutoFailBack(boolean allowAutoFailBack)
    {
       this.allowAutoFailBack = allowAutoFailBack;
    }
 
-   @Override
    public boolean isBackup()
    {
       return backup;
    }
 
-   @Override
    public boolean isFileDeploymentEnabled()
    {
       return fileDeploymentEnabled;
    }
 
-   @Override
    public void setFileDeploymentEnabled(final boolean enable)
    {
       fileDeploymentEnabled = enable;
    }
 
-   @Override
    public boolean isPersistenceEnabled()
    {
       return persistenceEnabled;
    }
 
-   @Override
    public void setPersistenceEnabled(final boolean enable)
    {
       persistenceEnabled = enable;
    }
 
-   @Override
    public long getFileDeployerScanPeriod()
    {
       return fileDeploymentScanPeriod;
    }
 
-   @Override
    public void setFileDeployerScanPeriod(final long period)
    {
       fileDeploymentScanPeriod = period;
@@ -410,109 +399,91 @@
    /**
     * @return the persistDeliveryCountBeforeDelivery
     */
-   @Override
    public boolean isPersistDeliveryCountBeforeDelivery()
    {
       return persistDeliveryCountBeforeDelivery;
    }
 
-   @Override
    public void setPersistDeliveryCountBeforeDelivery(final boolean persistDeliveryCountBeforeDelivery)
    {
       this.persistDeliveryCountBeforeDelivery = persistDeliveryCountBeforeDelivery;
    }
 
-   @Override
    public void setBackup(final boolean backup)
    {
       this.backup = backup;
    }
 
-   @Override
    public boolean isSharedStore()
    {
       return sharedStore;
    }
 
-   @Override
    public void setSharedStore(final boolean sharedStore)
    {
       this.sharedStore = sharedStore;
    }
 
-   @Override
    public int getScheduledThreadPoolMaxSize()
    {
       return scheduledThreadPoolMaxSize;
    }
 
-   @Override
    public void setScheduledThreadPoolMaxSize(final int maxSize)
    {
       scheduledThreadPoolMaxSize = maxSize;
    }
 
-   @Override
    public int getThreadPoolMaxSize()
    {
       return threadPoolMaxSize;
    }
 
-   @Override
    public void setThreadPoolMaxSize(final int maxSize)
    {
       threadPoolMaxSize = maxSize;
    }
 
-   @Override
    public long getSecurityInvalidationInterval()
    {
       return securityInvalidationInterval;
    }
 
-   @Override
    public void setSecurityInvalidationInterval(final long interval)
    {
       securityInvalidationInterval = interval;
    }
 
-   @Override
    public long getConnectionTTLOverride()
    {
       return connectionTTLOverride;
    }
 
-   @Override
    public void setConnectionTTLOverride(final long ttl)
    {
       connectionTTLOverride = ttl;
    }
 
-   @Override
    public boolean isAsyncConnectionExecutionEnabled()
    {
       return asyncConnectionExecutionEnabled;
    }
 
-   @Override
    public void setEnabledAsyncConnectionExecution(final boolean enabled)
    {
       asyncConnectionExecutionEnabled = enabled;
    }
 
-   @Override
    public List<String> getInterceptorClassNames()
    {
       return interceptorClassNames;
    }
 
-   @Override
    public void setInterceptorClassNames(final List<String> interceptors)
    {
       interceptorClassNames = interceptors;
    }
 
-   @Override
    public Set<TransportConfiguration> getAcceptorConfigurations()
    {
       return acceptorConfigs;
@@ -528,31 +499,26 @@
       return connectorConfigs;
    }
 
-   @Override
    public void setConnectorConfigurations(final Map<String, TransportConfiguration> infos)
    {
       connectorConfigs = infos;
    }
 
-   @Override
    public String getLiveConnectorName()
    {
       return liveConnectorName;
    }
 
-   @Override
    public void setLiveConnectorName(final String liveConnectorName)
    {
       this.liveConnectorName = liveConnectorName;
    }
-
-   @Override
+   
    public GroupingHandlerConfiguration getGroupingHandlerConfiguration()
    {
       return groupingHandlerConfiguration;
    }
 
-   @Override
    public void setGroupingHandlerConfiguration(final GroupingHandlerConfiguration groupingHandlerConfiguration)
    {
       this.groupingHandlerConfiguration = groupingHandlerConfiguration;

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/FileConfiguration.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/config/impl/FileConfiguration.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -38,7 +38,7 @@
 
    private static final String DEFAULT_CONFIGURATION_URL = "hornetq-configuration.xml";
 
-   // For a bridge confirmations must be activated or sent acknowledgments won't return
+   // For a bridge confirmations must be activated or send acknowledgments won't return
    public static final int DEFAULT_CONFIRMATION_WINDOW_SIZE = 1024 * 1024;
 
    // Static --------------------------------------------------------------------------

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/journal/SequentialFile.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/journal/SequentialFile.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -94,6 +94,8 @@
    void renameTo(String newFileName) throws Exception;
 
    SequentialFile copy();
+   
+   void copyTo(SequentialFile newFileName) throws Exception;
 
    void setTimedBuffer(TimedBuffer buffer);
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -104,6 +104,29 @@
 
       file.delete();
    }
+   
+   public void copyTo(SequentialFile newFileName) throws Exception
+   {
+      log.debug("Copying "  + this + " as " + newFileName);
+      newFileName.open();
+      this.open();
+      
+      
+      ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
+      
+      for (;;)
+      {
+         buffer.rewind();
+         int size = this.read(buffer);
+         newFileName.writeInternal(buffer);
+         if (size < 10 * 1024)
+         {
+            break;
+         }
+      }
+      newFileName.close();
+      this.close();
+   }
 
    public void position(final long pos) throws Exception
    {

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/PrintPages.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/PrintPages.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/PrintPages.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -26,6 +26,7 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.journal.PreparedTransactionInfo;
@@ -35,11 +36,13 @@
 import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
 import org.hornetq.core.paging.impl.PagingManagerImpl;
 import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.PageUpdateTXEncoding;
 import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
@@ -71,11 +74,14 @@
       if (arg.length != 2)
       {
          System.err.println("Usage: PrintPages <page foler> <journal folder>");
+         System.exit(-1);
       }
       try
       {
 
-         Map<Long, Set<PagePosition>> cursorACKs = PrintPages.loadCursorACKs(arg[1]);
+         Pair<Map<Long, Set<PagePosition>>, Set<Long>> cursorACKs = PrintPages.loadCursorACKs(arg[1]);
+         
+         Set<Long> pgTXs = cursorACKs.b;
 
          ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
          final ExecutorService executor = Executors.newFixedThreadPool(10);
@@ -116,7 +122,7 @@
                for (PagedMessage msg : msgs)
                {
                   msg.initMessage(sm);
-                  System.out.print("pg=" + pg + ", msg=" + msgID + "=" + msg.getMessage());
+                  System.out.print("pg=" + pgid + ", msg=" + msgID + ",pgTX=" + msg.getTransactionID() + ", msg=" + msg.getMessage());
                   System.out.print(",Queues = ");
                   long q[] = msg.getQueueIDs();
                   for (int i = 0; i < q.length; i++)
@@ -127,7 +133,7 @@
 
                      boolean acked = false;
 
-                     Set<PagePosition> positions = cursorACKs.get(q[i]);
+                     Set<PagePosition> positions = cursorACKs.a.get(q[i]);
                      if (positions != null)
                      {
                         acked = positions.contains(posCheck);
@@ -143,6 +149,10 @@
                         System.out.print(",");
                      }
                   }
+                  if (msg.getTransactionID() >= 0 && !pgTXs.contains(msg.getTransactionID()))
+                  {
+                     System.out.print(", **PG_TX_NOT_FOUND**");
+                  }
                   System.out.println();
                   msgID++;
                }
@@ -164,7 +174,7 @@
     * @return
     * @throws Exception
     */
-   protected static Map<Long, Set<PagePosition>> loadCursorACKs(final String journalLocation) throws Exception
+   protected static Pair<Map<Long, Set<PagePosition>>, Set<Long>> loadCursorACKs(final String journalLocation) throws Exception
    {
       SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journalLocation);
 
@@ -188,14 +198,17 @@
       messagesJournal.load(records, txs, null);
 
       Map<Long, Set<PagePosition>> cursorRecords = new HashMap<Long, Set<PagePosition>>();
+      
+      Set<Long> pgTXs = new HashSet<Long>();
 
       for (RecordInfo record : records)
       {
+         byte[] data = record.data;
+
+         HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+
          if (record.userRecordType == JournalStorageManager.ACKNOWLEDGE_CURSOR)
          {
-            byte[] data = record.data;
-
-            HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
             CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
             encoding.decode(buff);
 
@@ -209,8 +222,28 @@
 
             set.add(encoding.position);
          }
+         else if (record.userRecordType == JournalStorageManager.PAGE_TRANSACTION)
+         {
+            if (record.isUpdate)
+            {
+               PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
+
+               pageUpdate.decode(buff);
+               pgTXs.add(pageUpdate.pageTX);
+            }
+            else
+            {
+               PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+
+               pageTransactionInfo.decode(buff);
+
+               pageTransactionInfo.setRecordID(record.id);
+               pgTXs.add(pageTransactionInfo.getTransactionID());
+            }
+         }
       }
-      return cursorRecords;
+      
+      return new Pair<Map<Long, Set<PagePosition>>, Set<Long>>(cursorRecords, pgTXs);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PageSubscription.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PageSubscription.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.paging.cursor;
 
+import java.util.concurrent.Executor;
+
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.server.Queue;
@@ -131,4 +133,9 @@
     * @return
     */
    PagedMessage queryMessage(PagePosition pos);
+
+   /**
+    * @return
+    */
+   Executor getExecutor();
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PagedReferenceImpl.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/PagedReferenceImpl.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -14,8 +14,10 @@
 package org.hornetq.core.paging.cursor;
 
 import java.lang.ref.WeakReference;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.api.core.Message;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
@@ -34,6 +36,10 @@
 
    private static final long serialVersionUID = -8640232251318264710L;
 
+   private static final Logger log = Logger.getLogger(PagedReferenceImpl.class);
+
+   private static final boolean isTrace = log.isTraceEnabled();
+
    private final PagePosition position;
 
    private WeakReference<PagedMessage> message;
@@ -42,6 +48,8 @@
 
    private int persistedCount;
 
+   private AtomicInteger deliveryCount = new AtomicInteger(0);
+
    private final PageSubscription subscription;
 
    public ServerMessage getMessage()
@@ -84,12 +92,12 @@
    {
       return true;
    }
-   
+
    public void setPersistedCount(int count)
    {
       this.persistedCount = count;
    }
-   
+
    public int getPersistedCount()
    {
       return persistedCount;
@@ -100,8 +108,7 @@
     */
    public MessageReference copy(final Queue queue)
    {
-      // TODO Auto-generated method stub
-      return null;
+      return new PagedReferenceImpl(this.position, this.getPagedMessage(), this.subscription);
    }
 
    /* (non-Javadoc)
@@ -137,8 +144,7 @@
     */
    public int getDeliveryCount()
    {
-      // TODO Auto-generated method stub
-      return 0;
+      return deliveryCount.get();
    }
 
    /* (non-Javadoc)
@@ -146,8 +152,7 @@
     */
    public void setDeliveryCount(final int deliveryCount)
    {
-      // TODO Auto-generated method stub
-
+      this.deliveryCount.set(deliveryCount);
    }
 
    /* (non-Javadoc)
@@ -155,7 +160,11 @@
     */
    public void incrementDeliveryCount()
    {
-      // TODO Auto-generated method stub
+      deliveryCount.incrementAndGet();
+      if (isTrace)
+      {
+         log.trace("deliveryCount = " + deliveryCount + " for " + this);
+      }
 
    }
 
@@ -164,8 +173,7 @@
     */
    public void decrementDeliveryCount()
    {
-      // TODO Auto-generated method stub
-
+      deliveryCount.decrementAndGet();
    }
 
    /* (non-Javadoc)
@@ -199,4 +207,25 @@
    {
       subscription.ackTx(tx, this);
    }
+
+   /* (non-Javadoc)
+    * @see java.lang.Object#toString()
+    */
+   @Override
+   public String toString()
+   {
+      return "PagedReferenceImpl [position=" + position +
+             ", message=" +
+             message +
+             ", deliveryTime=" +
+             deliveryTime +
+             ", persistedCount=" +
+             persistedCount +
+             ", deliveryCount=" +
+             deliveryCount +
+             ", subscription=" +
+             subscription +
+             "]";
+   }
+
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -30,7 +30,6 @@
 import org.hornetq.core.paging.cursor.PagedReference;
 import org.hornetq.core.paging.cursor.PagedReferenceImpl;
 import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.Future;
 import org.hornetq.utils.SoftValueHashMap;
 import org.jboss.netty.util.internal.ConcurrentHashMap;
@@ -57,8 +56,7 @@
 
    private final StorageManager storageManager;
 
-   private final ExecutorFactory executorFactory;
-
+   // This is the same executor used at the PageStoreImpl. One Executor per pageStore
    private final Executor executor;
 
    private final SoftValueHashMap<Long, PageCache> softCache;
@@ -71,13 +69,12 @@
 
    public PageCursorProviderImpl(final PagingStore pagingStore,
                                  final StorageManager storageManager,
-                                 final ExecutorFactory executorFactory,
+                                 final Executor executor,
                                  final int maxCacheSize)
    {
       this.pagingStore = pagingStore;
       this.storageManager = storageManager;
-      this.executorFactory = executorFactory;
-      this.executor = executorFactory.getExecutor();
+      this.executor = executor;
       this.softCache = new SoftValueHashMap<Long, PageCache>(maxCacheSize);
    }
 
@@ -96,13 +93,7 @@
          throw new IllegalStateException("Cursor " + cursorID + " had already been created");
       }
 
-      activeCursor = new PageSubscriptionImpl(this,
-                                              pagingStore,
-                                              storageManager,
-                                              executorFactory.getExecutor(),
-                                              filter,
-                                              cursorID,
-                                              persistent);
+      activeCursor = new PageSubscriptionImpl(this, pagingStore, storageManager, executor, filter, cursorID, persistent);
       activeCursors.put(cursorID, activeCursor);
       return activeCursor;
    }
@@ -389,6 +380,17 @@
             {
                pagingStore.stopPaging();
             }
+            else
+            {
+               if (log.isTraceEnabled())
+               {
+                  log.trace("Couldn't cleanup page on address " + this.pagingStore.getAddress() +
+                            " as numberOfPages == " +
+                            pagingStore.getNumberOfPages() +
+                            " and currentPage.numberOfMessages = " +
+                            pagingStore.getCurrentPage().getNumberOfMessages());
+               }
+            }
          }
          catch (Exception ex)
          {
@@ -411,7 +413,7 @@
             {
                cache = softCache.remove((long)depagedPage.getPageId());
             }
-            
+
             if (cache == null)
             {
                // The page is not on cache any more
@@ -426,7 +428,7 @@
             {
                pgdMessages = cache.getMessages();
             }
-            
+
             depagedPage.delete(pgdMessages);
             synchronized (softCache)
             {

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -21,6 +21,7 @@
 
 import org.hornetq.api.core.Pair;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.MessageReference;
@@ -40,6 +41,8 @@
 
    // Constants -----------------------------------------------------
    static final Logger log = Logger.getLogger(PageSubscriptionCounterImpl.class);
+   
+   static final boolean isTrace = log.isTraceEnabled();
 
    // Attributes ----------------------------------------------------
 
@@ -51,8 +54,12 @@
    private long recordID = -1;
 
    private boolean persistent;
+   
+   private final PageSubscription subscription;
 
    private final StorageManager storage;
+   
+   private final Executor executor;
 
    private final AtomicLong value = new AtomicLong(0);
 
@@ -60,8 +67,6 @@
 
    private LinkedList<Pair<Long, Integer>> loadList;
 
-   private final Executor executor;
-
    private final Runnable cleanupCheck = new Runnable()
    {
       public void run()
@@ -77,14 +82,16 @@
    // Constructors --------------------------------------------------
 
    public PageSubscriptionCounterImpl(final StorageManager storage,
+                                      final PageSubscription subscription,
+                                      final Executor executor,
                                       final boolean persistent,
-                                      final long subscriptionID,
-                                      final Executor executor)
+                                      final long subscriptionID)
    {
       this.subscriptionID = subscriptionID;
+      this.executor = executor;
       this.storage = storage;
-      this.executor = executor;
       this.persistent = persistent;
+      this.subscription = subscription;
    }
 
    /* (non-Javadoc)
@@ -253,10 +260,13 @@
          }
 
          newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace);
+         
+         if (isTrace)
+         {
+            log.trace("Replacing page-counter record = "  + recordID + " by record = " + newRecordID + " on subscriptionID = " + this.subscriptionID + " for queue = " + this.subscription.getQueue().getName());
+         }
 
          storage.commit(txCleanup);
-
-         storage.waitOnOperations();
      }
       catch (Exception e)
       {

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -92,8 +92,6 @@
 
    private final PageCursorProvider cursorProvider;
 
-   private final Executor executor;
-
    private volatile PagePosition lastAckedPosition;
 
    private List<PagePosition> recoveredACK;
@@ -101,6 +99,8 @@
    private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
 
    private final PageSubscriptionCounter counter;
+   
+   private final Executor executor;
 
    private final AtomicLong deliveredCount = new AtomicLong(0);
 
@@ -126,7 +126,7 @@
       this.executor = executor;
       this.filter = filter;
       this.persistent = persistent;
-      this.counter = new PageSubscriptionCounterImpl(store, persistent, cursorId, executor);
+      this.counter = new PageSubscriptionCounterImpl(store, this, executor, persistent, cursorId);
    }
 
    // Public --------------------------------------------------------
@@ -224,7 +224,7 @@
       // First get the completed pages using a lock
       synchronized (this)
       {
-         for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
+         for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet()) 
          {
             PageCursorInfo info = entry.getValue();
             if (info.isDone() && !info.isPendingDelete() && lastAckedPosition != null)
@@ -687,6 +687,14 @@
       }
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageSubscription#executeWithContext(java.lang.Runnable)
+    */
+   public Executor getExecutor()
+   {
+      return executor;
+   }
+
    private synchronized PageCursorInfo getPageInfo(final PagePosition pos)
    {
       return getPageInfo(pos, true);
@@ -734,8 +742,17 @@
    {
       if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0)
       {
+         if (isTrace)
+         {
+            log.trace("a new position is being processed as ACK");
+         }
          if (lastAckedPosition != null && lastAckedPosition.getPageNr() != pos.getPageNr())
          {
+            if (isTrace)
+            {
+               log.trace("Scheduling cleanup on pageSubscription for address = " + pageStore.getAddress() + " queue = " + this.getQueue().getName());
+            }
+            
             // there's a different page being acked, we will do the check right away
             if (autoCleanup)
             {
@@ -780,7 +797,7 @@
 
    private PageTransactionInfo getPageTransaction(final PagedReference reference)
    {
-      if (reference.getPagedMessage().getTransactionID() != 0)
+      if (reference.getPagedMessage().getTransactionID() >= 0)
       {
          return pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID());
       }
@@ -816,6 +833,7 @@
 
       private final long pageId;
 
+      // TODO: Merge removedReferences and acks into a single structure
       // Confirmed ACKs on this page
       private final Set<PagePosition> acks = Collections.synchronizedSet(new LinkedHashSet<PagePosition>());
 
@@ -1135,6 +1153,13 @@
                {
                   ignored = true;
                }
+               
+               PageCursorInfo info = getPageInfo(message.getPosition(), false);
+               
+               if (info != null && info.isRemoved(message.getPosition()))
+               {
+                  continue;
+               }
 
                // 2nd ... if TX, is it committed?
                if (valid && message.getPagedMessage().getTransactionID() >= 0)
@@ -1145,7 +1170,7 @@
                   {
                      log.warn("Couldn't locate page transaction " + message.getPagedMessage().getTransactionID() +
                               ", ignoring message on position " +
-                              message.getPosition());
+                              message.getPosition() + " on address=" + pageStore.getAddress() + " queue=" + queue.getName());
                      valid = false;
                      ignored = true;
                   }
@@ -1166,7 +1191,7 @@
                   // Say you have a Browser that will only read the files... there's no need to control PageCursors is
                   // nothing
                   // is being changed. That's why the false is passed as a parameter here
-                  PageCursorInfo info = getPageInfo(message.getPosition(), false);
+                 
                   if (info != null && info.isRemoved(message.getPosition()))
                   {
                      valid = false;

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageSyncTimer.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageSyncTimer.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageSyncTimer.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -19,6 +19,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.persistence.OperationContext;
 
@@ -34,6 +35,9 @@
 
    // Constants -----------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(PageSyncTimer.class);
+
+
    // Attributes ----------------------------------------------------
    
    private final PagingStore store;
@@ -83,6 +87,7 @@
       OperationContext [] pendingSyncsArray;
       synchronized (this)
       {
+         
          pendingSync = false;
          pendingSyncsArray = new OperationContext[syncOperations.size()];
          pendingSyncsArray = syncOperations.toArray(pendingSyncsArray);
@@ -91,7 +96,10 @@
       
       try
       {
-         store.ioSync();
+         if (pendingSyncsArray.length != 0)
+         {
+            store.ioSync();
+         }
       }
       catch (Exception e)
       {

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -84,6 +84,7 @@
          HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(largeMessageLazyData);
          message.decodeHeadersAndProperties(buffer);
          lgMessage.incrementDelayDeletionCount();
+         lgMessage.setPaged();
          largeMessageLazyData = null;
       }
    }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -106,12 +106,11 @@
                                  syncTimeout,
                                  pagingManager,
                                  storageManager,
-                                 postOffice,
                                  null,
                                  this,
                                  address,
                                  settings,
-                                 executorFactory,
+                                 executorFactory.getExecutor(),
                                  syncNonTransactional);
    }
 
@@ -212,12 +211,11 @@
                                                     syncTimeout,
                                                     pagingManager,
                                                     storageManager,
-                                                    postOffice,
                                                     factory,
                                                     this,
                                                     address,
                                                     settings,
-                                                    executorFactory,
+                                                    executorFactory.getExecutor(),
                                                     syncNonTransactional);
 
             storesReturn.add(store);

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -46,6 +46,7 @@
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.RouteContextList;
 import org.hornetq.core.server.RoutingContext;
@@ -80,8 +81,6 @@
 
    private final StorageManager storageManager;
 
-   private final PostOffice postOffice;
-
    private final DecimalFormat format = new DecimalFormat("000000000");
 
    private final AtomicInteger currentPageSize = new AtomicInteger(0);
@@ -148,12 +147,11 @@
                           final long syncTimeout,
                           final PagingManager pagingManager,
                           final StorageManager storageManager,
-                          final PostOffice postOffice,
                           final SequentialFileFactory fileFactory,
                           final PagingStoreFactory storeFactory,
                           final SimpleString storeName,
                           final AddressSettings addressSettings,
-                          final ExecutorFactory executorFactory,
+                          final Executor executor,
                           final boolean syncNonTransactional)
    {
       if (pagingManager == null)
@@ -165,8 +163,6 @@
 
       this.storageManager = storageManager;
 
-      this.postOffice = postOffice;
-
       this.storeName = storeName;
 
       applySetting(addressSettings);
@@ -181,7 +177,7 @@
                                          pageSize);
       }
 
-      this.executor = executorFactory.getExecutor();
+      this.executor = executor;
 
       this.pagingManager = pagingManager;
 
@@ -200,7 +196,7 @@
          this.syncTimer = null;
       }
 
-      this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executorFactory, addressSettings.getPageCacheMaxSize());
+      this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executor, addressSettings.getPageCacheMaxSize());
 
    }
 
@@ -870,6 +866,11 @@
 
 
          PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), tx == null ? -1 : tx.getID());
+         
+         if (message.isLargeMessage())
+         {
+            ((LargeServerMessage)message).setPaged();
+         }
 
          int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
 
@@ -879,20 +880,19 @@
             openNewPage();
             currentPageSize.addAndGet(bytesToWrite);
          }
-         
-         installPageTransaction(tx, listCtx, currentPage.getPageId());
  
          currentPage.write(pagedMessage);
-
-         if (sync || tx != null)
-         {
-            sync();
-         }
          
          if (tx != null)
          {
+            installPageTransaction(tx, listCtx);
             tx.setWaitBeforeCommit(true);
          }
+         else
+         if (sync && tx == null)
+         {
+            sync();
+         }
 
          return true;
       }
@@ -924,38 +924,46 @@
       return ids;
    }
 
-   private PageTransactionInfo installPageTransaction(final Transaction tx, final RouteContextList listCtx, int pageID) throws Exception
+   private void installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception
    {
-      if (tx == null)
+      FinishPageMessageOperation pgOper = (FinishPageMessageOperation)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+      if (pgOper == null)
       {
-         return null;
+         PageTransactionInfo pgTX = new PageTransactionInfoImpl(tx.getID());
+         pagingManager.addTransaction(pgTX);
+         pgOper = new FinishPageMessageOperation(pgTX, storageManager, pagingManager);
+         tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgOper);
+         tx.addOperation(pgOper);
       }
-      else
-      {
-         PageTransactionInfo pgTX = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
-         if (pgTX == null)
-         {
-            pgTX = new PageTransactionInfoImpl(tx.getID());
-            pagingManager.addTransaction(pgTX);
-            tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgTX);
-            tx.addOperation(new FinishPageMessageOperation(pgTX));
-         }
 
-         pgTX.increment(listCtx.getNumberOfQueues());
+      pgOper.addStore(this);
+      pgOper.pageTransaction.increment(listCtx.getNumberOfQueues());
 
-         return pgTX;
-      }
+      return;
    }
 
-   private class FinishPageMessageOperation implements TransactionOperation
+   private static class FinishPageMessageOperation implements TransactionOperation
    {
-      private final PageTransactionInfo pageTransaction;
+      public final PageTransactionInfo pageTransaction;
+      
+      private final StorageManager storageManager;
+      
+      private final PagingManager pagingManager;
+      
+      private final Set<PagingStore> usedStores = new HashSet<PagingStore>();
 
       private boolean stored = false;
+      
+      public void addStore(PagingStore store)
+      {
+         this.usedStores.add(store);
+      }
 
-      public FinishPageMessageOperation(final PageTransactionInfo pageTransaction)
+      public FinishPageMessageOperation(final PageTransactionInfo pageTransaction, final StorageManager storageManager, final PagingManager pagingManager)
       {
          this.pageTransaction = pageTransaction;
+         this.storageManager = storageManager;
+         this.pagingManager = pagingManager;
       }
 
       public void afterCommit(final Transaction tx)
@@ -984,11 +992,24 @@
 
       public void beforeCommit(final Transaction tx) throws Exception
       {
+         syncStore();
          storePageTX(tx);
       }
 
+      /**
+       * @throws Exception
+       */
+      private void syncStore() throws Exception
+      {
+         for (PagingStore store : usedStores)
+         {
+            store.sync();
+         }
+      }
+
       public void beforePrepare(final Transaction tx) throws Exception
       {
+         syncStore();
          storePageTX(tx);
       }
 

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -2402,7 +2402,7 @@
       }
    }
 
-   private static class PageUpdateTXEncoding implements EncodingSupport
+   public static class PageUpdateTXEncoding implements EncodingSupport
    {
 
       public long pageTX;

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -18,6 +18,7 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.BodyEncoder;
@@ -48,6 +49,8 @@
    private final JournalStorageManager storageManager;
 
    private LargeServerMessage linkMessage;
+   
+   private boolean paged;
 
    // We should only use the NIO implementation on the Journal
    private SequentialFile file;
@@ -82,6 +85,11 @@
 
    // Public --------------------------------------------------------
 
+   public void setPaged()
+   {
+      paged = true;
+   }
+   
    /* (non-Javadoc)
     * @see org.hornetq.core.server.LargeServerMessage#addBytes(byte[])
     */
@@ -260,27 +268,69 @@
          }
       }
    }
+   
 
+   public void setOriginalHeaders(final ServerMessage other, final boolean expiry)
+   {
+      super.setOriginalHeaders(other, expiry);
+      
+      LargeServerMessageImpl otherLM = (LargeServerMessageImpl)other;
+      this.paged = otherLM.paged;
+      if (this.paged)
+      {
+         this.removeProperty(Message.HDR_ORIG_MESSAGE_ID); 
+      }
+   }
+
+
    @Override
    public synchronized ServerMessage copy(final long newID)
    {
-      incrementDelayDeletionCount();
-
-      long idToUse = messageID;
-
-      if (linkMessage != null)
+      if (!paged)
       {
-         idToUse = linkMessage.getMessageID();
+         incrementDelayDeletionCount();
+   
+         long idToUse = messageID;
+   
+         if (linkMessage != null)
+         {
+            idToUse = linkMessage.getMessageID();
+         }
+   
+         SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
+   
+         ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
+                                                                                  : (LargeServerMessageImpl)linkMessage,
+                                                               newfile,
+                                                               newID);
+         return newMessage;
       }
-
-      SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
-
-      ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
-                                                                               : (LargeServerMessageImpl)linkMessage,
-                                                            newfile,
-                                                            newID);
-
-      return newMessage;
+      else
+      {
+         try
+         {
+            validateFile();
+            
+            SequentialFile file = this.file;
+            
+            SequentialFile newFile = storageManager.createFileForLargeMessage(newID, durable);
+            
+            file.copyTo(newFile);
+            
+            LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile, newID);
+            
+            newMessage.linkMessage = null;
+            
+            newMessage.setPaged();
+            
+            return newMessage;
+         }
+         catch (Exception e)
+         {
+            log.warn("Error on copying large message this for DLA or Expiry", e);
+            return null;
+         }
+      }
    }
 
    public SequentialFile getFile()

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -157,7 +157,14 @@
       return "LargeServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress()  + ",properties=" + properties.toString() + "]";
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.LargeServerMessage#setPaged()
+    */
+   public void setPaged()
+   {
+   }
 
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -267,7 +267,6 @@
                                                              configuration);
    }
 
-   @Override
    public synchronized void start() throws Exception
    {
       if (channelFactory != null)
@@ -330,7 +329,6 @@
 
       ChannelPipelineFactory factory = new ChannelPipelineFactory()
       {
-         @Override
          public ChannelPipeline getPipeline() throws Exception
          {
             Map<String, ChannelHandler> handlers = new LinkedHashMap<String, ChannelHandler>();
@@ -349,7 +347,7 @@
             if (httpEnabled)
             {
                handlers.put("http-decoder", new HttpRequestDecoder());
-
+               
                handlers.put("http-aggregator", new HttpChunkAggregator(Integer.MAX_VALUE));
 
                handlers.put("http-encoder", new HttpResponseEncoder());
@@ -486,7 +484,6 @@
       }
    }
 
-   @Override
    public synchronized void stop()
    {
       if (channelFactory == null)
@@ -562,13 +559,11 @@
       paused = false;
    }
 
-   @Override
    public boolean isStarted()
    {
       return channelFactory != null;
    }
 
-   @Override
    public void pause()
    {
       if (paused)
@@ -611,7 +606,6 @@
       paused = true;
    }
 
-   @Override
    public void setNotificationService(final NotificationService notificationService)
    {
       this.notificationService = notificationService;
@@ -638,7 +632,6 @@
          {
             sslHandler.handshake().addListener(new ChannelFutureListener()
             {
-               @Override
                public void operationComplete(final ChannelFuture future) throws Exception
                {
                   if (future.isSuccess())
@@ -661,7 +654,6 @@
 
    private class Listener implements ConnectionLifeCycleListener
    {
-      @Override
       public void connectionCreated(final Connection connection, final ProtocolType protocol)
       {
          if (connections.putIfAbsent(connection.getID(), (NettyConnection)connection) != null)
@@ -672,7 +664,6 @@
          listener.connectionCreated(connection, NettyAcceptor.this.protocol);
       }
 
-      @Override
       public void connectionDestroyed(final Object connectionID)
       {
          if (connections.remove(connectionID) != null)
@@ -681,7 +672,6 @@
          }
       }
 
-      @Override
       public void connectionException(final Object connectionID, final HornetQException me)
       {
          // Execute on different thread to avoid deadlocks
@@ -696,23 +686,21 @@
 
       }
 
-      @Override
       public void connectionReadyForWrites(final Object connectionID, boolean ready)
       {
          NettyConnection conn = connections.get(connectionID);
-
+         
          if (conn != null)
          {
             conn.fireReady(ready);
-         }
-      }
+         }         
+      }            
    }
 
    private class BatchFlusher implements Runnable
    {
       private boolean cancelled;
 
-      @Override
       public synchronized void run()
       {
          if (!cancelled)

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -30,7 +30,13 @@
    void setLinkedMessage(LargeServerMessage message);
 
    boolean isFileExists() throws Exception;
-
+   
+   /**
+    * We have to copy the large message content in case of DLQ and paged messages
+    * For that we need to pre-mark the LargeMessage with a flag when it is paged
+    */
+   void setPaged();
+   
    /** Close the files if opened */
    void releaseResources();
 

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -72,6 +72,8 @@
 public class QueueImpl implements Queue
 {
    private static final Logger log = Logger.getLogger(QueueImpl.class);
+   
+   private static final boolean isTrace = log.isTraceEnabled();
 
    public static final int REDISTRIBUTOR_BATCH_SIZE = 100;
 
@@ -119,6 +121,8 @@
 
    private final Runnable deliverRunner = new DeliverRunner();
 
+   private volatile boolean depagePending = false;
+   
    private final Runnable depageRunner = new DepageRunner();
 
    private final StorageManager storageManager;
@@ -396,7 +400,7 @@
 
    public void deliverAsync()
    {
-      executor.execute(deliverRunner);
+      getExecutor().execute(deliverRunner);
    }
 
    public void close() throws Exception
@@ -411,7 +415,15 @@
 
    public Executor getExecutor()
    {
-      return executor;
+      if (pageSubscription != null && pageSubscription.isPaging())
+      {
+         // When in page mode, we don't want to have concurrent IO on the same PageStore
+         return pageSubscription.getExecutor();
+      }
+      else
+      {
+         return executor;
+      }
    }
 
    /* Only used on tests */
@@ -432,7 +444,7 @@
 
       if (!ok)
       {
-         log.warn("Couldn't finish waiting executors. Try increasing the thread pool size");
+         log.warn("Couldn't finish waiting executors. Try increasing the thread pool size", new Exception ("trace"));
       }
       
       return ok;
@@ -827,10 +839,18 @@
    {
       if (expiryAddress != null)
       {
+         if (isTrace)
+         {
+            log.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName(), new Exception ("trace"));
+         }
          move(expiryAddress, ref, true, false);
       }
       else
       {
+         if (isTrace)
+         {
+            log.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName());
+         }
          acknowledge(ref);
       }
    }
@@ -1440,6 +1460,8 @@
       int numRefs = messageReferences.size();
 
       int handled = 0;
+      
+      long timeout = System.currentTimeMillis() + 1000;
 
       while (handled < numRefs)
       {
@@ -1451,6 +1473,19 @@
 
             return;
          }
+         
+         if (pageSubscription != null && pageSubscription.isPaging() && System.currentTimeMillis() > timeout)
+         {
+            if (isTrace)
+            {
+               log.trace("Page delivery has been running for too long. Scheduling another delivery task now");
+            }
+            
+            deliverAsync();
+            
+            return;
+         }
+         
 
          ConsumerHolder holder = consumerList.get(pos);
 
@@ -1549,7 +1584,7 @@
          }
       }
 
-      if (pageIterator != null && messageReferences.size() == 0 && pageIterator.hasNext())
+      if (pageIterator != null && messageReferences.size() == 0 && pageSubscription.isPaging() && pageIterator.hasNext()) 
       {
          scheduleDepage();
       }
@@ -1580,12 +1615,20 @@
 
    private void scheduleDepage()
    {
-      executor.execute(depageRunner);
+      if (!depagePending)
+      {
+         if (isTrace)
+         {
+            log.trace("Scheduling depage for queue " + this.getName());
+         }
+         depagePending = true;
+         pageSubscription.getExecutor().execute(depageRunner);
+      }
    }
 
    private void depage()
    {
-      if (paused || pageIterator == null || consumerList.isEmpty())
+      if (paused || pageIterator == null)
       {
          return;
       }
@@ -1593,13 +1636,15 @@
       long maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
 
       // System.out.println("QueueMemorySize before depage = " + queueMemorySize.get());
+      int depaged = 0;
       while (queueMemorySize.get() < maxSize && pageIterator.hasNext())
       {
+         depaged++;
          PagedReference reference = pageIterator.next();
          addTail(reference, false);
          pageIterator.remove();
       }
-      // System.out.println("QueueMemorySize after depage = " + queueMemorySize.get() + " depaged " + nmessages);
+      log.debug("Queue Memory Size after depage on queue="+this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages");
 
       deliverAsync();
    }
@@ -1629,11 +1674,13 @@
 
       if (internalQueue)
       {
+         if (isTrace)
+         {
+            log.trace("Queue " + this.getName() + " is an internal queue, no checkRedelivery");
+         }
          // no DLQ check on internal queues
          return true;
       }
-
-      // TODO: DeliveryCount on paging
       
       if (!internalQueue && message.isDurable() && durable && !reference.isPaged())
       {
@@ -1647,7 +1694,11 @@
       // First check DLA
       if (maxDeliveries > 0 && reference.getDeliveryCount() >= maxDeliveries)
       {
-         sendToDeadLetterAddress(reference);
+         if (isTrace)
+         {
+            log.trace("Sending reference " + reference + " to DLA = " + addressSettings.getDeadLetterAddress() +  " since ref.getDeliveryCount=" + reference.getDeliveryCount() + "and maxDeliveries=" + maxDeliveries + " from queue=" + this.getName());
+         }
+         sendToDeadLetterAddress(reference, addressSettings.getDeadLetterAddress());
 
          return false;
       }
@@ -1658,6 +1709,10 @@
 
          if (redeliveryDelay > 0)
          {
+            if (isTrace)
+            {
+               log.trace("Setting redeliveryDelay=" + redeliveryDelay + " on reference=" + reference);
+            }
             reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);
             
             if (message.isDurable() && durable)
@@ -1738,10 +1793,14 @@
       }
    }
 
+   
    private void sendToDeadLetterAddress(final MessageReference ref) throws Exception
    {
-      SimpleString deadLetterAddress = addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress();
-
+      sendToDeadLetterAddress(ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());
+   }
+   
+   private void sendToDeadLetterAddress(final MessageReference ref, final  SimpleString deadLetterAddress) throws Exception
+   {
       if (deadLetterAddress != null)
       {
          Bindings bindingList = postOffice.getBindingsForAddress(deadLetterAddress);
@@ -1859,6 +1918,10 @@
    {
       if (reference.getMessage().isExpired())
       {
+         if (isTrace)
+         {
+            log.trace("Reference " + reference + " is expired");
+         }
          reference.handled();
 
          try
@@ -1941,7 +2004,7 @@
             // ack isn't committed, then the server crashes and on
             // recovery the message is deleted even though the other ack never committed
 
-            // also note then when this happens as part of a transaction it is the tx commit of the ack that is
+            // also note then when this happens as part of a transaction it is the tx commit of the ack that is 
             // important not this
 
             // Also note that this delete shouldn't sync to disk, or else we would build up the executor's queue
@@ -2152,6 +2215,7 @@
       {
          try
          {
+            depagePending = false;
             depage();
          }
          catch (Exception e)

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -16,7 +16,6 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -84,8 +83,6 @@
 
    private final ServerSession session;
 
-   private final Executor executor;
-
    private final Object lock = new Object();
 
    private volatile AtomicInteger availableCredits = new AtomicInteger(0);
@@ -153,8 +150,6 @@
 
       messageQueue = binding.getQueue();
 
-      this.executor = messageQueue.getExecutor();
-
       this.started = browseOnly || started;
 
       this.browseOnly = browseOnly;
@@ -376,7 +371,7 @@
 
       Future future = new Future();
 
-      executor.execute(future);
+      messageQueue.getExecutor().execute(future);
 
       boolean ok = future.await(10000);
 
@@ -483,7 +478,7 @@
 
          Future future = new Future();
 
-         executor.execute(future);
+         messageQueue.getExecutor().execute(future);
 
          boolean ok = future.await(10000);
 
@@ -668,7 +663,7 @@
          {
             if (browseOnly)
             {
-               executor.execute(browserDeliverer);
+               messageQueue.getExecutor().execute(browserDeliverer);
             }
             else
             {
@@ -680,7 +675,7 @@
 
    private void resumeLargeMessage()
    {
-      executor.execute(resumeLargeMessageRunnable);
+      messageQueue.getExecutor().execute(resumeLargeMessageRunnable);
    }
 
    private void deliverLargeMessage(final MessageReference ref, final ServerMessage message) throws Exception
@@ -723,7 +718,7 @@
                {
                   if (browseOnly)
                   {
-                     executor.execute(browserDeliverer);
+                     messageQueue.getExecutor().execute(browserDeliverer);
                   }
                   else
                   {

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/version/impl/VersionImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/version/impl/VersionImpl.java	2011-05-17 04:37:18 UTC (rev 10675)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/version/impl/VersionImpl.java	2011-05-17 04:51:31 UTC (rev 10676)
@@ -46,7 +46,7 @@
    private final int incrementingVersion;
 
    private final String versionSuffix;
-
+   
    private final String nettyVersion;
 
    private final int[] compatibleVersionList;
@@ -75,13 +75,12 @@
       this.versionSuffix = versionSuffix;
 
       this.nettyVersion = nettyVersion;
-
+      
       this.compatibleVersionList = compatibleVersionList;
    }
 
    // Version implementation ------------------------------------------
 
-   @Override
    public String getFullVersion()
    {
       return majorVersion + "." +
@@ -97,49 +96,41 @@
              ")";
    }
 
-   @Override
    public String getVersionName()
    {
       return versionName;
    }
 
-   @Override
    public int getMajorVersion()
    {
       return majorVersion;
    }
 
-   @Override
    public int getMinorVersion()
    {
       return minorVersion;
    }
 
-   @Override
    public int getMicroVersion()
    {
       return microVersion;
    }
 
-   @Override
    public String getVersionSuffix()
    {
       return versionSuffix;
    }
 
-   @Override
    public int getIncrementingVersion()
    {
       return incrementingVersion;
    }
 
-   @Override
    public String getNettyVersion()
    {
       return nettyVersion;
    }
 
-   @Override
    public int[] getCompatibleVersionList()
    {
       return compatibleVersionList;
@@ -154,7 +145,7 @@
       {
          return true;
       }
-      if (!(other instanceof Version))
+      if (other instanceof Version == false)
       {
          return false;
       }



More information about the hornetq-commits mailing list