The following is a fairly naive implementation of a Finagle protocol that uses the scodec library for binary encoding and decoding. I'll assume something like the following SBT setup:
scalaVersion := "2.10.4"
resolvers += Resolver.sonatypeRepo("snapshots")
libraryDependencies ++= Seq(
"com.twitter" %% "finagle-core" % "6.20.0",
"org.typelevel" %% "scodec-core" % "1.3.0-SNAPSHOT"
)
First for an scodec import we'll need throughout:
import scodec.Codec
Next for conversions from scodec's Codec
to Netty encoders and decoders:
trait CodecConversions {
import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
import org.jboss.netty.handler.codec.oneone.{OneToOneDecoder, OneToOneEncoder}
import scodec.bits.BitVector
/**
* Converts an scodec codec into a Netty encoder.
*/
protected def encoder[A: Codec] = new OneToOneEncoder {
override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: Object) =
ChannelBuffers.wrappedBuffer(
Codec.encodeValid(msg.asInstanceOf[A]).toByteBuffer
)
}
/**
* Converts an scodec codec into a Netty decoder.
*/
protected def decoder[A: Codec] = new OneToOneDecoder {
override def decode(ctx: ChannelHandlerContext, channel: Channel, msg: Object) =
msg match {
case cb: ChannelBuffer =>
Codec.decodeValidValue[A](BitVector(cb.toByteBuffer)).asInstanceOf[Object]
case other => other
}
}
}
And then channel pipeline and codec factories:
trait Factories { this: CodecConversions =>
import com.twitter.finagle.{Codec => FinagleCodec, CodecFactory}
import org.jboss.netty.channel.{ChannelPipelineFactory, Channels}
/**
* Creates a Netty channel pipeline factory given input and output types.
*/
private[this] def pipeline[I: Codec, O: Codec] = new ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("encoder", encoder[I])
pipeline.addLast("decoder", decoder[O])
pipeline
}
}
/**
* Creates a Finagle codec factory given input and output types.
*/
protected def codecFactory[I: Codec, O: Codec] = new CodecFactory[I, O] {
def server = Function.const {
new FinagleCodec[I, O] { def pipelineFactory = pipeline[O, I] }
}
def client = Function.const {
new FinagleCodec[I, O] { def pipelineFactory = pipeline[I, O] }
}
}
}
And then the part that actually creates our Finagle clients and servers:
object Finagler extends Factories with CodecConversions {
import com.twitter.conversions.time._
import com.twitter.finagle.Service
import com.twitter.finagle.builder.{ClientBuilder, ServerBuilder}
import com.twitter.util.{Duration, Future}
import java.net.InetSocketAddress
/**
* Creates a Finagle server from a function given that we have scodec codecs
* for both the input and output types.
*/
def server[I, O](port: Int)(f: I => Future[O])(implicit ic: Codec[I], oc: Codec[O]) =
ServerBuilder()
.name("server")
.codec(codecFactory[I, O])
.bindTo(new InetSocketAddress(port))
.build(new Service[I, O] { def apply(i: I) = f(i) })
/**
* Creates a Finagle client given input and output types with scodec codecs.
*/
def client[I, O](host: String, timeout: Duration = 1.second)
(implicit ic: Codec[I], oc: Codec[O]) =
ClientBuilder()
.name("client")
.codec(codecFactory[I, O])
.hosts(host)
.hostConnectionLimit(1)
.timeout(timeout)
.build()
}
And finally usage looks like this:
import scodec._, codecs._
import com.twitter.util.Future
case class Point(x: Double, y: Double)
implicit val pointCodec = (double :: double).as[Point]
implicit val pointsCodec = list(pointCodec)
def center(points: List[Point]) = {
val Point(x, y) = points.reduce[Point] {
case (Point(x1, y1), Point(x2, y2)) => Point(x1 + x2, y1 + y2)
}
Point(x / points.size, y / points.size)
}
val server = Finagler.server(9000)(center _ andThen Future.value _)
And then we can create a client:
val client = Finagler.client[List[Point], Point]("localhost:9000")
And call it:
client(List(Point(0, 1), Point(1, 1), Point(1, 0), Point(0, 0))).onSuccess(println)
Which will print what we'd expect:
scala> Point(0.5,0.5)
So it seems to work in this simple use case, but I'm not a Netty expert, and I'm curious about ways this naive approach (simply piping everything through a OneToOneEncoder
, etc.) could get me into trouble.
1 Answer 1
When decoding a message (i.e. transforming received byte stream into objects), you need to keep in mind that a single socket write is not always translated into a single socket read.
For example, let's say you wrote 4 bytes in a single write attempt. On the reader's side, it can be read with 2 read attempts where each buffer contains less-than-4-byte data.
Therefore, using OneToOneDecoder
will not work when your server starts to handler more load. You need to use other decoder classes such as FrameDecoder
and ReplayingDecoder
.
For more information, please read the 'Dealing with a Stream-based Transport' section in the user guide.