[infinispan-commits] Infinispan SVN: r1497 - in trunk/server: memcached/src/main/java/org/infinispan/server/memcached and 2 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Feb 11 07:36:04 EST 2010
Author: galder.zamarreno at jboss.com
Date: 2010-02-11 07:36:03 -0500 (Thu, 11 Feb 2010)
New Revision: 1497
Added:
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/netty/
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/netty/NettyMemcachedDecoder.java
trunk/server/pom.xml
Log:
- Added server-parent pom
- Forgot to commit refactored decoder class
Added: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/netty/NettyMemcachedDecoder.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/netty/NettyMemcachedDecoder.java (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/netty/NettyMemcachedDecoder.java 2010-02-11 12:36:03 UTC (rev 1497)
@@ -0,0 +1,149 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached.transport.netty;
+
+import static org.infinispan.server.memcached.TextProtocolUtil.CR;
+import static org.infinispan.server.memcached.TextProtocolUtil.CRLF;
+import static org.infinispan.server.memcached.TextProtocolUtil.LF;
+import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
+
+import java.io.IOException;
+import java.io.StreamCorruptedException;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.infinispan.Cache;
+import org.infinispan.server.core.Command;
+import org.infinispan.server.core.InterceptorChain;
+import org.infinispan.server.memcached.Reply;
+import org.infinispan.server.memcached.UnknownCommandException;
+import org.infinispan.server.memcached.commands.CommandFactory;
+import org.infinispan.server.memcached.commands.StorageCommand;
+import org.infinispan.server.memcached.commands.TextCommand;
+import org.infinispan.server.memcached.commands.Value;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
+
+/**
+ * NettyMemcachedDecoder.
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class NettyMemcachedDecoder extends ReplayingDecoder<NettyMemcachedDecoder.State> {
+ private static final Log log = LogFactory.getLog(NettyMemcachedDecoder.class);
+
+ private final CommandFactory factory;
+ private volatile TextCommand command;
+
+ protected enum State {
+ READ_COMMAND, READ_UNSTRUCTURED_DATA
+ }
+
+ public NettyMemcachedDecoder(Cache<String, Value> cache, InterceptorChain chain, ScheduledExecutorService scheduler) {
+ super(State.READ_COMMAND, true);
+ factory = new CommandFactory(cache, chain, scheduler);
+ }
+
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, State state) throws Exception {
+ switch (state) {
+ case READ_COMMAND:
+ command = factory.createCommand(readLine(buffer));
+ if (command.getType().isStorage())
+ checkpoint(State.READ_UNSTRUCTURED_DATA);
+ else
+ return command;
+ break;
+ case READ_UNSTRUCTURED_DATA:
+ StorageCommand storageCmd = (StorageCommand) command;
+ byte[] data= new byte[storageCmd.getParams().getBytes()];
+ buffer.readBytes(data, 0, data.length);
+ byte next = buffer.readByte();
+ if (next == CR) {
+ next = buffer.readByte();
+ if (next == LF) {
+ try {
+ return reset(storageCmd.setData(data));
+ } catch (IOException ioe) {
+ checkpoint(State.READ_COMMAND);
+ throw ioe;
+ }
+ } else {
+ throw new StreamCorruptedException("Expecting \r\n after data block");
+ }
+ } else {
+ throw new StreamCorruptedException("Expecting \r\n after data block");
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ Throwable t = e.getCause();
+ log.error("Unexpected exception", t);
+ Channel ch = ctx.getChannel();
+ if (t instanceof UnknownCommandException) {
+ ch.write(wrappedBuffer(wrappedBuffer(Reply.ERROR.bytes()), wrappedBuffer(CRLF)));
+ } else if (t instanceof IOException) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(Reply.CLIENT_ERROR).append(' ').append(t);
+ ch.write(wrappedBuffer(wrappedBuffer(sb.toString().getBytes()), wrappedBuffer(CRLF)));
+ } else {
+ StringBuilder sb = new StringBuilder();
+ sb.append(Reply.SERVER_ERROR).append(' ').append(t);
+ ch.write(wrappedBuffer(wrappedBuffer(sb.toString().getBytes()), wrappedBuffer(CRLF)));
+ }
+ }
+
+ private Object reset(Command c) {
+ this.command = null;
+ checkpoint(State.READ_COMMAND);
+ return c;
+ }
+
+ private String readLine(ChannelBuffer buffer) {
+ StringBuilder sb = new StringBuilder(64);
+ int lineLength = 0;
+ while (true) {
+ byte next = buffer.readByte();
+ if (next == CR) {
+ next = buffer.readByte();
+ if (next == LF) {
+ return sb.toString();
+ }
+ } else if (next == LF) {
+ return sb.toString();
+ } else {
+ lineLength++;
+ sb.append((char) next);
+ }
+ }
+ }
+
+}
Added: trunk/server/pom.xml
===================================================================
--- trunk/server/pom.xml (rev 0)
+++ trunk/server/pom.xml 2010-02-11 12:36:03 UTC (rev 1497)
@@ -0,0 +1,31 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.infinispan</groupId>
+ <artifactId>infinispan-parent</artifactId>
+ <version>4.0.0-SNAPSHOT</version>
+ <relativePath>../parent/pom.xml</relativePath>
+ </parent>
+ <artifactId>infinispan-server-parent</artifactId>
+ <name>Parent pom for server modules</name>
+ <packaging>pom</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>infinispan-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>infinispan-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
More information about the infinispan-commits
mailing list