[jboss-cvs] JBoss Messaging SVN: r4837 - in trunk/src/main/org/jboss/messaging/core: remoting/impl/wireformat and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Aug 19 15:10:48 EDT 2008


Author: timfox
Date: 2008-08-19 15:10:48 -0400 (Tue, 19 Aug 2008)
New Revision: 4837

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/ConnectionLifeCycleListener.java
   trunk/src/main/org/jboss/messaging/core/remoting/MessagingBuffer.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/cluster/
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/cluster/ConsumerReplicateDeliveryMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/cluster/ConsumerReplicateDeliveryResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/server/CommandManager.java
   trunk/src/main/org/jboss/messaging/core/server/impl/CommandManagerImpl.java
Log:
Removed connection


Added: trunk/src/main/org/jboss/messaging/core/remoting/ConnectionLifeCycleListener.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/ConnectionLifeCycleListener.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/ConnectionLifeCycleListener.java	2008-08-19 19:10:48 UTC (rev 4837)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.spi.Connection;
+
+/**
+ *
+ * A ConnectionLifeCycleListener
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface ConnectionLifeCycleListener
+{
+   void connectionCreated(Connection connection);
+
+   void connectionDestroyed(Object connectionID);
+
+   void connectionException(Object connectionID, MessagingException me);
+}

Added: trunk/src/main/org/jboss/messaging/core/remoting/MessagingBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/MessagingBuffer.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/MessagingBuffer.java	2008-08-19 19:10:48 UTC (rev 4837)
@@ -0,0 +1,125 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A MessagingBuffer
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface MessagingBuffer
+{
+   void putByte(byte val);
+
+   void putBytes(byte[] bytes);
+
+   void putBytes(byte[] bytes, int offset, int length);
+
+   void putInt(int val);
+
+   void putInt(int pos, int val);
+
+   void putLong(long val);
+
+   void putShort(short val);
+
+   void putDouble(double val);
+
+   void putFloat(float val);
+
+   void putBoolean(boolean val);
+
+   void putChar(char val);
+
+   void putNullableString(String val);
+
+   void putString(String val);
+
+   void putSimpleString(SimpleString val);
+
+   void putNullableSimpleString(SimpleString val);
+
+   void putUTF(String utf) throws Exception;
+
+   byte getByte();
+
+   short getUnsignedByte();
+
+   void getBytes(byte[] bytes);
+
+   void getBytes(byte[] bytes, int offset, int length);
+
+   int getInt();
+
+   long getLong();
+
+   short getShort();
+
+   int getUnsignedShort();
+
+   double getDouble();
+
+   float getFloat();
+
+   boolean getBoolean();
+
+   char getChar();
+
+   String getString();
+
+   String getNullableString();
+
+   SimpleString getSimpleString();
+
+   SimpleString getNullableSimpleString();
+
+   String getUTF() throws Exception;
+
+   byte[] array();
+
+   int remaining();
+
+   int capacity();
+
+   int limit();
+
+   void limit(int limit);
+
+   void flip();
+
+   void position(int position);
+
+   int position();
+
+   void rewind();
+
+   MessagingBuffer slice();
+
+   MessagingBuffer createNewBuffer(int len);
+
+   Object getUnderlyingBuffer();
+}

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/cluster/ConsumerReplicateDeliveryMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/cluster/ConsumerReplicateDeliveryMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/cluster/ConsumerReplicateDeliveryMessage.java	2008-08-19 19:10:48 UTC (rev 4837)
@@ -0,0 +1,100 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */ 
+
+package org.jboss.messaging.core.remoting.impl.wireformat.cluster;
+
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+
+/**
+ * 
+ * A ConsumerReplicateDeliveryMessage
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ConsumerReplicateDeliveryMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long messageID;
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ConsumerReplicateDeliveryMessage(final long messageID)
+   {
+      super(REPLICATE_DELIVERY);
+
+      this.messageID = messageID;
+   }
+   
+   public ConsumerReplicateDeliveryMessage()
+   {
+      super(REPLICATE_DELIVERY);
+   }
+
+   // Public --------------------------------------------------------
+
+   public long getMessageID()
+   {
+      return messageID;
+   }
+   
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.putLong(messageID);
+   }
+   
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      messageID = buffer.getLong();
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", messageID=" + messageID + "]";
+   }
+   
+   public boolean equals(Object other)
+   {
+      if (other instanceof ConsumerReplicateDeliveryMessage == false)
+      {
+         return false;
+      }
+            
+      ConsumerReplicateDeliveryMessage r = (ConsumerReplicateDeliveryMessage)other;
+      
+      return super.equals(other) && this.messageID == r.messageID;
+   }
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/cluster/ConsumerReplicateDeliveryResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/cluster/ConsumerReplicateDeliveryResponseMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/cluster/ConsumerReplicateDeliveryResponseMessage.java	2008-08-19 19:10:48 UTC (rev 4837)
@@ -0,0 +1,100 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */ 
+
+package org.jboss.messaging.core.remoting.impl.wireformat.cluster;
+
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+
+/**
+ * 
+ * A ConsumerReplicateDeliveryResponseMessage
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ConsumerReplicateDeliveryResponseMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long messageID;
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ConsumerReplicateDeliveryResponseMessage(final long messageID)
+   {
+      super(REPLICATE_DELIVERY_RESPONSE);
+
+      this.messageID = messageID;
+   }
+   
+   public ConsumerReplicateDeliveryResponseMessage()
+   {
+      super(REPLICATE_DELIVERY_RESPONSE);
+   }
+
+   // Public --------------------------------------------------------
+
+   public long getMessageID()
+   {
+      return messageID;
+   }
+   
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.putLong(messageID);
+   }
+   
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      messageID = buffer.getLong();
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", messageID=" + messageID + "]";
+   }
+   
+   public boolean equals(Object other)
+   {
+      if (other instanceof ConsumerReplicateDeliveryResponseMessage == false)
+      {
+         return false;
+      }
+            
+      ConsumerReplicateDeliveryResponseMessage r = (ConsumerReplicateDeliveryResponseMessage)other;
+      
+      return super.equals(other) && this.messageID == r.messageID;
+   }
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: trunk/src/main/org/jboss/messaging/core/server/CommandManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/CommandManager.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/CommandManager.java	2008-08-19 19:10:48 UTC (rev 4837)
@@ -0,0 +1,42 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.server;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.Packet;
+
+/**
+ * A CommandManagerImpl
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface CommandManager
+{
+   Packet sendCommandBlocking(long targetID, Packet packet) throws MessagingException;
+   
+   void sendCommandOneway(long targetID, Packet packet);
+   
+   void packetProcessed(Packet packet);
+   
+   void close();
+}

Added: trunk/src/main/org/jboss/messaging/core/server/impl/CommandManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/CommandManagerImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/CommandManagerImpl.java	2008-08-19 19:10:48 UTC (rev 4837)
@@ -0,0 +1,177 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.server.impl;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
+import org.jboss.messaging.core.server.CommandManager;
+
+/**
+ * A CommandManagerImpl
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class CommandManagerImpl implements CommandManager, PacketHandler
+{
+   private static final Logger log = Logger.getLogger(CommandManagerImpl.class);
+      
+   private final java.util.Queue<Packet> resendCache = new ConcurrentLinkedQueue<Packet>();
+   
+   private final int commandBatchSize;
+   
+   //These members don't need to be synchronized since only called by one thread concurrently
+   //but do need to be volatile since that could be a different thread each time
+   
+   private volatile int lastCommandID = -1;
+   
+   private volatile int nextConfirmation;
+   
+   private volatile int commandIDSequence = 0;
+   
+   private final RemotingConnection remotingConnection;
+   
+   private final long sessionTargetID;
+   
+   private final long localCommandResponseTargetID;
+   
+   private final long remoteCommandResponseTargetID;
+   
+   public CommandManagerImpl(final int commandBatchSize, final RemotingConnection remotingConnection,
+                             final PacketDispatcher dispatcher,
+                             final long sessionTargetID,
+                             final long localCommandResponseTargetID,
+                             final long remoteCommandResponseTargetID)
+   {
+      this.commandBatchSize = commandBatchSize;
+      
+      nextConfirmation = commandBatchSize - 1;
+      
+      this.remotingConnection = remotingConnection;
+      
+      this.sessionTargetID = sessionTargetID;
+      
+      this.localCommandResponseTargetID = localCommandResponseTargetID;
+      
+      this.remoteCommandResponseTargetID = remoteCommandResponseTargetID;
+      
+      dispatcher.register(this);
+   }
+       
+   //Needs to be synchronized since on the server messages can be sent to the client same time as blocking
+   //responses are returned
+   public synchronized Packet sendCommandBlocking(final long targetID, final Packet packet) throws MessagingException
+   {
+      setCommandID(packet);
+      
+      Packet response = remotingConnection.sendBlocking(targetID, sessionTargetID, packet, this);
+        
+      if (response.getType() == PacketImpl.EXCEPTION)
+      {
+         MessagingExceptionMessage mem = (MessagingExceptionMessage)response;
+         
+         throw mem.getException();
+      }
+      
+      return response;
+   }
+   
+   public synchronized void sendCommandOneway(final long targetID, final Packet packet)
+   {
+      setCommandID(packet);
+      
+      remotingConnection.sendOneWay(targetID, sessionTargetID, packet);
+   }
+    
+   public void packetProcessed(final Packet packet)
+   {
+      long commandID = packet.getCommandID();
+      
+      if (commandID != ++lastCommandID)
+      {
+         throw new IllegalStateException("Command id out of sequence, got " + commandID + " expected " + lastCommandID);
+      }
+      
+      if (commandID == nextConfirmation)
+      {
+         Packet confirmed = new PacketsConfirmedMessage(lastCommandID);
+               
+         confirmed.setTargetID(remoteCommandResponseTargetID);
+         
+         nextConfirmation += commandBatchSize;
+          
+         remotingConnection.sendOneWay(confirmed);
+      }
+   }
+   
+   public void close()
+   {
+      remotingConnection.getPacketDispatcher().unregister(localCommandResponseTargetID);
+   }
+         
+   // PacketHandler implementation --------------------------------------------------------------
+   
+   public long getID()
+   {
+      return localCommandResponseTargetID;
+   }
+
+   public void handle(final Object connectionID, final Packet m)
+   {      
+      PacketsConfirmedMessage msg = (PacketsConfirmedMessage)m;
+      
+      Packet packet;
+      do
+      {
+         packet = resendCache.poll();      
+      }
+      while (packet.getCommandID() < msg.getCommandID());
+   }
+   
+   // Public -----------------------------------------------------------------------------------
+   
+   public int getUnconfirmedPackets()
+   {
+      return this.resendCache.size();
+   }
+   
+   // Private -----------------------------------------------------------------------------------
+   
+   private void setCommandID(final Packet packet)
+   {
+      int commandID = commandIDSequence++;
+      
+      packet.setCommandID(commandID);
+           
+      resendCache.add(packet);
+   }
+    
+}




More information about the jboss-cvs-commits mailing list