Sunday, January 8, 2017

Building REST API web service using AKKA-HTTP, showing CRUD operations baked with Redis

Building REST API web service using AKKA-HTTP, showing CRUD operations baked with Redis

AKKA-HTTP is a lightweight layer that enable us to easily build our REST API over the akka actor system .
In this article I will show step by step how to create simple web server with REST API Service and CRUD operations using Rediscala that will enable non-blocking and asynchronous I/O operations over Redis, sowing different options of completing the API response and serializing entities . And of course how to test our REST API.

Full source code can be found here

Our web server will expose API for handling user passwords that will be kept in Redis (this is not a good practice to handle it like this in real life, and one might want to consider to add encryption etc’ in production).
Our Rest API will expose the following functionality :
  • Register new user
  • Update user password
  • Fetch user password
  • Delete user
So, let’s get our hands dirty

First step — add dependencies to our Build.sbt file

name := """akka-http-redis"""
version := "1.0"
scalaVersion := "2.11.8"
scalacOptions := Seq("-unchecked", "-deprecation", "-encoding", "utf8")
libraryDependencies ++= {
  val akkaV       = "2.4.3"
  val scalaTestV  = "2.2.6"
    "com.typesafe.akka" %% "akka-actor" % akkaV,
    "com.typesafe.akka" %% "akka-stream" % akkaV,
    "com.typesafe.akka" %% "akka-http-experimental" % akkaV,
    "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaV,
    "com.typesafe.akka" %% "akka-http-testkit" % akkaV,
    "com.github.etaty" %% "rediscala" % "1.7.0",
    "org.scalatest"     %% "scalatest" % scalaTestV % "test",
     "org.scalamock" %% "scalamock-scalatest-support" % "3.4.2" % "test"

Next — Let’s write some code.

Now we are ready to build our server .
let’s start with simple trait for our db with the CRUD operations that we need :
trait RedisRepoImpl extends Repo {
  def db: RedisClient
  def del(key: String): Future[Long] = db.del(key)
  def upsert[V: ByteStringSerializer](key: String, value: V, expire: Option[Duration] = None): Future[Boolean] = db.set(key, value)
  def get(key: String): Future[Option[String]] = db.get[String](key)
*note since Redis “Set” command is used for inserting new key but if the key already exists — the value is overwritten, hence we will use it as “upsert” command.

UserHandler Actor

This actor will do the following:
  • Get the request from the Rest API layer
  • Communicate with the db
  • Do some “mambo jumbo” (AKA logic) and reply to the Rest API Layer
Note that we can directly send the response from a future to an actor using the pipeTo pattern. all you need is import akka.pattern.pipe and make sure to have ExecutionContext in scope. let’s see that in action (code):
object UserHandler {
  def props(db: Repo): Props = Props(new UserHandler(db))
  case class User(username: String, details: String)
  case class Register(username: String, password: String)
  case class Update(username: String, details: String)
  case class GetUser(username: String)
  case class DeleteUser(username: String)
  case class UserNotFound(username: String)
  case class UserDeleted(username: String)
//simple DI through constructor but of course you can use any DI //framework (e.g Guice, Spring ...) or DI pattern (e.g cake pattern) //but this is out of scope of this post 
class UserHandler(db: Repo) extends Actor with ActorLogging with {
  import UserHandler._
  implicit val ec = context.dispatcher
  override def receive: Receive = {
    case Register(id, pwd) =>
      db.upsert(id, pwd) pipeTo sender()
    case Update(id, details) =>
      db.upsert(id, details) pipeTo sender()
case GetUser(userName) =>
      //closing over the sender in Future is not safe.
      val requestor = sender()
        case Some(i) => requestor ! User(userName, i)
        case None => requestor ! UserNotFound
case DeleteUser(userName) =>
      val requestor = sender()
        case effectedRows if effectedRows > 0 => 
                         requestor ! UserDeleted(userName)
        case _ => requestor ! UserNotFound(userName)
Now we are finally ready for

Writing REST API using AKKA-HTTP

Writing REST API with akka-http is simple, all we need to do is to create Routes i.e add the paths that we want, and unmarshal the request body. in order to achieve that I’ll create some case classes that will represent the request body and I’ll use spray-json to de/serialize json into a case class.
case class UserPwd(pwd:String)
case class UpsertRequest(username:String, password:String )
trait Protocols extends DefaultJsonProtocol {
  implicit val delUserFormat = jsonFormat1(UserDeleted.apply)
  implicit val uNotFoundFormat = jsonFormat1(UserNotFound.apply)
  implicit val usrFormat = jsonFormat2(User.apply)
  implicit val userLogin = jsonFormat2(UpsertRequest.apply)

Serializing the request

let’s start with the registration. In this case registering new user will not require authentication and will comply to the following PUT /api/user/register . Note the ease that we can deserialize the request body to a case class. The code is quite self explanatory :
val unsecuredRoutes: Route = {
    pathPrefix("api/user") {
      path("register") {
          put {
            entity(as[UpsertRequest]) { u =>
              complete {
                (userHandler ? UserHandler.Register(u.username, u.password)).map {
                  case true => OK -> s"Thank you ${u.username}" //plain text response
                  case _ => InternalServerError -> "Failed to complete your request. please try later"


The other operations (delete user, update password) will require authentication. We will use simple basic user/pwd authentication (note that this implementation is quite stupid because every authenticated user will be able to execute this operations on every user but it will do just for the demo)
so we need to create the functionality the will authenticate the requestor e.g
def userAuthenticate(credentials: Credentials): Future[Option[User]]
now we can use Akka basic authentication support to wrap the inner route.

A word regarding the path matching

Akka-HTTP have a rich DSL for path matching, it is very powerful and convenient to use. for example note the use of the ~ sign to chain the inner routes having the same prefix (“/user”) and the “Segment” keyword that enable us to capture text following the “/user/” path . for example
using this get request
curl -i -X GET \
with this PathDirectives
path(Segment) { id =>
will map value (123) captured by the Segment keyword to the “id” variable

Responding to requestor

Since we defined the serialization protocol our case class response will be converted to JSON response,
we can map the “Future” that we got from the handler and map it to our case class using mapTo. in the following example I will show two types of responses JSON and Plain Text.
* This is not a good practice to do in production. Make your API consist on single response type (e.g JSON) .
val routes: Route =
    logRequestResult("akka-http-secured-service") {
      authenticateBasicAsync(realm = "secure site",
                                    userAuthenticate) { user =>
        pathPrefix("user") {
          path(Segment) { id =>
              get {
                complete {
                  //Response with JSON
                  (userHandler ? GetUser(id)).mapTo[User]
            } ~
            path(Segment) { id =>
              post {
                entity(as[UpsertRequest]) { u =>
                  complete {
                    //Response with Plain Text
                    (userHandler ?UserHandler.Update(u.username,
                                                  u.password)).map {
                      case false => 
                       InternalServerError -> s"Could not update  
                                                           user $id"
                      case _ => 
                            NoContent -> ""
The rest of the code can be found here

The Server — Main

Now we need to bind our routes to http host and port
object AkkaHttpRedisService extends App with Service {
  override implicit val system = ActorSystem()
  override implicit val executor = system.dispatcher
  override implicit val materializer = ActorMaterializer()
  override val config = ConfigFactory.load()
  override val logger = Logging(system, getClass)
  val userHandler = system.actorOf(UserHandler.props)
  Http().bindAndHandle(unsecuredRoutes ~ routes , config.getString("http.interface"), config.getInt("http.port"))


Everything should be tested. But in order not to make this post too long we will focus on testing the REST Api.
Since we want to maintain the purity of our unit tests we can mock our db. for that we will use scalamock and mixin the “MockFactory” (
Akka supplies ScalatestRouteTest trait that we can mixin with our test framework and to test any aspect of the response for example :
class ServiceSpec extends FlatSpec
  with Matchers
  with ScalatestRouteTest
  with Service
  with MockFactory {
  val repo: Repo = stub[Repo]
  val userHandler = TestActorRef[UserHandler](new UserHandler(repo))
  "Registration Service" should "add user" in {
    val userName = "newuser"
    val pwd = "123pwd"
    //mock the response from the db
    (repo.upsert[String](_: String, _: String, _: Option[Duration])(_: ByteStringSerializer[String]))
      .when(userName, pwd, None, *) returns Future.successful(true)
//route call  
    Put(s"/api/user/register", UpsertRequest(userName, pwd)) ~>
      unsecuredRoutes ~> check {
      status shouldBe OK
      responseAs[String] shouldBe s"Thank you $userName"
  "Secured service" should "get user" in {
    repo.get _ when validUser returns Future.successful(Some(pwd))
    Get(s"/user/$validUser") ~> addCredentials(userCredentials) ~>
      routes ~> check {
      status shouldBe OK
      contentType shouldBe `application/json`
      responseAs[User] shouldBe User(validUser,pwd)
That’s it !
Full source code can be found here
Hope you enjoyed. Your comments and inputs are greatly appreciated .
You are welcome to contribute to the code

Thursday, June 2, 2016

Simple and useful Zip and unZip text

Simple and usefull textual zip and unzip.
First we use binary zip . but if we want to send it as a text message (e.g some message bus supports only text messages) we need to convert it to text by encoding to base 64 and vice versa to unzip it.
import java.nio.charset.StandardCharsets
import{GZIPOutputStream, GZIPInputStream}
import org.apache.commons.codec.binary.Base64

import scala.util.Try
  val zipToBinary: String => Array[Byte] = {txt =>
    val arrOutputStream = new ByteArrayOutputStream()
    val zipOutputStream = new GZIPOutputStream(arrOutputStream)

  val byteArrayToTxt : Array[Byte] => String = bytes => Base64.encodeBase64String(bytes)

  val txtToBinary : String => Array[Byte]= txt => Base64.decodeBase64(txt.getBytes(StandardCharsets.UTF_8))

  val binaryToTxt : Array[Byte] => String = bytes => GZIPInputStream(new ByteArrayInputStream(bytes)))(StandardCharsets.UTF_8).mkString

  val zip:String => String = zipToBinary andThen byteArrayToTxt

  val unzip :String => String = txtToBinary andThen binaryToTxt

Usage :
val someLongTxt = "One morning, when Gregor Samsa woke from troubled dreams, he found himself transformed in his bed into a horrible vermin. He lay on his armour-like back, and if he lifted his head a little he could see his brown belly, slightly domed and divided by arches into stiff sections. The bedding was hardly able to cover it and seemed ready to slide off any moment. His many legs, pitifully thin compared with the size of the rest of him, waved about helplessly as he looked. \"What's happened to me?\" he thought. It wasn't a dream. His room, a proper human room although a little too small, lay peacefully between its four familiar walls. A collection of textile samples lay spread out on the table - Samsa was a travelling salesman - and above it there hung a picture that he had recently cut out of an illustrated magazine and housed in a nice, gilded frame. It showed a lady fitted out with a fur hat and fur boa who sat upright, raising a heavy fur muff that covered the whole of her lower arm towards the viewer. Gregor then turned to look out the window at the dull weather. Drops"
assert (unzip(zip(someLongTxt)) == someLongTxt)

Thursday, March 24, 2016

Playing with xml

XML is a convenient way to define external DSL. On one of my previous posts (Filtering using Scala Parser Combinators) I used parser combinators JavaTokenParsers with PackratParsers This time I will use different approach using XML.
XML is easy to handle by non-developers and easy to read. We can query XML using XPATH like expressions and using pattern matching. In this post we will define some business rules using XML and we will treat our XML as first class citizen. Source code can be found here.
let's define our DSL :
Our rules will use AND/OR operators but to make it more interesting, our operators will not use the following pattern:
<expression><operator><expression> (where expression evaluates to boolean).
because XML have opening and closing tags we can define something like :
<operator><expression1><expression2>...<expression N></operator>
now we can define our AND/OR operators :
  • AND: a set of expressions that all must be true
  • OR : a set of expressions that at least one must be true
  • SINGLE: contain our value and the operator used to evaluate the expression.
for example we want our condition to look something like this :
 val simpleRuleXml =
      <RULE id="1" description="5 top" status="true">
            <SINGLE operator="eq"> 2 </SINGLE>
            <SINGLE operator="eq"> 4 </SINGLE>
            <SINGLE operator="eq"> 8 </SINGLE>
            <SINGLE operator="gt"> 25 </SINGLE>
            <SINGLE operator="st"> 100 </SINGLE>
that means that our value must be one of the following values: 2 or 4 or 8 or between 25 to 100
let's start by defining our value objects
sealed trait Expression

case class Or(ps: Seq[Expression]) extends Expression

case class And(ps: Seq[Expression]) extends Expression

case class Single(operand:String, operator:String) extends Expression
working with xml allows to navigate between the nodes and parse it very easily by using pattern matching and parse it recursively :
for example this expression will match all nodes between the TAG and bind it to the variable xs using a wild card _ and a Kleen star * which actually means "match any sequence " <TAG>{ xs @ _* }</TAG>
object Expression{
  def apply(ns: NodeSeq):Expression ={
    ns.head match  {
      case <CONDITIONS>{ xs @ _* }</CONDITIONS> => Expression(xs) 
      case <AND>{ xs @ _* }</AND> => And(xs map(Expression(_)))
      case <OR>{ xs @ _* }</OR> => Or(xs map(Expression(_)))
      case <SINGLE>{ s @ _* }</SINGLE> =>Single(s.text,ns\@"condition")
That's it ! well , almost... we still need to evaluate our rule, and check if our conditions are met .
object Rule{
  def apply(node: Seq[xml.Node]): Rule = {
    val id =node\s"@id"
    val desc =node\s"@description"
    val conditions = (node\\"CONDITIONS").map(Expression(_))
    Rule.apply(id.text,desc.text,conditions )
A rule is mainly a set of conditions that needs to be met. As described above - all expressions between the AND tags must be true,
at least on expression between OR tags must be true.

case class Rule(id:String,desc:String,conditions:Seq[Expression]){
  def isValid[T:Ordering](value:T)(implicit f:String => T):Boolean = {
    def run (pr:Expression):Boolean = {
      pr match {
        case Single(name,operator) =>
          import scala.math.Ordering.Implicits._
            val n = f(name)
            operator match {
              case "eq" => value == n
              case "gt" => value > n
              case "st" => value < n
        case And(ps) => ps forall run
        case Or(ps) => ps exists run
    conditions forall run
and we are done !

for completion let's define some explicits so we can convert our value in the xml which is a String to T .
object Implicits {
//Since Java does not define toInt on String but on Int, so we need to be define which "toInt" implicit to use  
  implicit def string2Int(s: String): Int = augmentString(s).toInt
  implicit def string2Double(s: String): Double = augmentString(s).toDouble
  implicit def string2Float(s: String): Float = augmentString(s).toFloat

usage (using the above xml) :
val trimmedNode = scala.xml.Utility.trim(simpleRuleXml)

    val v = Rule(trimmedNode)
hope you enjoyed it . The Complete Source Code can be found here ,Feedback and remarks are always welcome

Sunday, November 22, 2015

Presenting the File Tracker

This project goal is to track changes in files and manage those changes as a byte array in asynchronous way using Akka actors and Java NIO library.
This is done by registering directory for the WatchService and filtering the files using PathMatcher . For each change in the file the requester will receive the byte array reflecting that change.
Currently this project supports only addition to file, i.e deletion of characters in file is not supported.
The Complete Source Code can be found here

Let's dive in.

The Ingredients :

FileSyncIo .

This part is copied from the FileAsyncIo project with some adjustments, and it is very handy for reading files asynchronously.
In order to read the file we use Java NIO AsynchronousFileChannel . Since we are only reading the file, we open the channel with the Read option. The method accepts buffer, the start position and a handler :
  val p = Promise[Array[Byte]]()
  val buffer = ByteBuffer.allocate(channel.size().toInt), position, buffer, onComplete(channel, p))
I really like this implementation of the handler that uses a promise to complete the handler and consume the byte array with the promised change
private def onComplete(channel: AsynchronousFileChannel, p: Promise[Array[Byte]]) = {
    new CompletionHandler[Integer, ByteBuffer]() {
      def completed(res: Integer, buffer: ByteBuffer) {
        p.complete(Try {


The watch service actor uses the WatcService - to register directory and getting create,modify and delete events.
  path.register(watchService, Array(ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY))
The WatchServiceActor reads the events and notifies the registered actor. In order to avoid sacrificing a thread the watchservice actor sends a message to itself periodically
def monitor: Receive = {
    case Poll =>
      val key = watchService.take()
      key.pollEvents() foreach { event =>
        val relativePath = event.context.asInstanceOf[Path]
        val path = contextAbsolutePath(key, relativePath)
        event.kind match {
          case kind: WatchEvent.Kind[_] if monitoredTypes.contains(kind) =>
            notifyActor ! EventOccured(kind, path)
          case _ => // do nothing
      context.system.scheduler.scheduleOnce(500 millis, self, Poll)
    case Interrupt =>


This actor's purpose is to process the byte stream received from the file and send to the requester. In it's constructor, it accepts the directory to monitor and a pattern to use to identify which files we want to track. Although the watchservice can handle an arbitrary number of directories, I chose to keep one FileMonitorActor per directory (and pattern to filter the monitored files) and for each one to keep a single WatchServiceActor. I found it is easier to manage. In order to filter desired files the FileMonitorActor uses the PathMatcher and accepts a text pattern as glob (see this link). The actor registers to watch service in order to be notified about the events. In order to monitor the files and their changes, it uses two mutable collections
val filePos = new mutable.HashMap[Path, Long]
val fileQueue = new mutable.HashMap[Path, mutable.Queue[Array[Byte]]] 
  • the first one is used to preserve the last position that we read from that file.
  • the second keeps a map file and queue of bytes from each event. Once requested it dequeue the bulk of bytes that reflects the change (FIFO) and sends it to the requester.
This file monitor create a WatchServiceActor and subscribes for getting event changes in it's constructor
watchActor = Some(context.actorOf(WatchServiceActor(self)))
and sends watch request to the WatchServiceActor
watchActor foreach (_ ! Watch(dir))
Since the WatchService monitors all changes in the directory the FileMonitorActor accepts events change message from the watch service regarding all the files and filters only relevant files using the PathMatcher
case EventOccured(event, path) if matcher.matches(path.getFileName) =>
      event match {
        case ENTRY_CREATE => addFileToQueue(path)
        case ENTRY_MODIFY => process(path)
        case ENTRY_DELETE => removeFileFromQueue(path)

FileMonitoringAggregatorActor .

This guy manages the FileMonitorActor actors and aggregates the changes from all monitors . It keeps a buffer that contains all changes from the monitors. Once requested it sends the requester all accumulated changes and clears the buffer (might cause overflow issues if not consumed) .
This actor accepts List of Tuples of directory and pattern
case class PathPattern(path:String,pattern:String)
It spawns FileMonitorActors per directory and keeps a map of actor per path(this will be useful for adding and removing paths and prevents creating duplicates ).
  val monitorActors = paths.collect {
    case p:PathPattern if Files.exists(p.path) =>
    p.path -> context.actorOf(FileMonitorActor(p.path, p.pattern), p.path)
  }(collection.breakOut):mutable.HashMap[String, ActorRef]
In order to start monitoring it accepts an Init message with a boolean flag to determine if we want to track existing files in the directory or just new ones. it also uses periodically message to request the changes and it accumulates the answers in a buffer. Once
case object GetNextBulks
is called it returns all accumulated changes since the last request to the requester
    case GetNextBulks =>
      sender ! bulksBuffer.toList
The FileMonitoringAggregatorActor manages the FileMonitorActors , when getting AddPath command it simply adds another FileMonitorActor for that Path. when getting remove path it simply sends Stop message to that Actor .
  def monitoring: Receive = {

    case GetNextBulks =>"Sending back "+bulksBuffer)
      sender ! bulksBuffer.toList
    case RequestNextBulk =>
      monitorActors.values foreach (_ ! GetBulk)
      context.system.scheduler.scheduleOnce(500 millis, self, RequestNextBulk)
    case bs: ByteBulks =>
      bulksBuffer += bs
    case Stop =>
      monitorActors.values foreach (_ ! Stop)
      context become ready
    case AddPath(p,i)=>
      if (monitorActors.contains(p.path))
        log.warning(s"Request Add ${p.path} is redundant because it is already monitored ")
        if (Files.exists(p.path)) {
          val m = context.actorOf(FileMonitorActor(p.path, p.pattern), removeSlashes(p.path))
          monitorActors += p.path -> m
          m ! Init(i)
          log.error(s"Cannot add path. Reason: Directory ${p.path} does not exists")
    case RemovePath(p) =>
      monitorActors.find(_._1 == p) match {
        case Some(a) =>
"Remove ${a._1} from monitor")
          a._2 ! Stop
          monitorActors -= a._1
        case None => log.warning(s"Cannot remove $p. Reason: Not Found ")


We can use the watchservice to register a directory and be notified on and file change in that directory. A PathMatcher is useful to filter only the Path's/Files. In this project we use Akka Actors for maintaining non blocking operations and keeping the state of position and the changes as Byte array. The FileMonitoringAggregatorActor will return all accumulated changes since the last request (i.e GetNextBulk message).
object ApplicationMain extends App {
  val system = ActorSystem("MyActorSystem")

  val pattern = "*.txt"
  val pathPattern1 = PathPattern("/tmp",pattern)
  val pathPattern2 = PathPattern("/home/avi/Downloads",pattern)
implicit val timeout = Timeout(10 seconds)

  val monitorActor = system.actorOf(FileMonitoringAggregatorActor(List(pathPattern1,pathPattern2)))
  monitorActor ! Init(true)
and start making changes , you can see they are reflected in the log file you can also send a request message to see them
val changes = ask(monitorActor , GetNextBulks).mapTo[List[Bulks]]
hope you enjoyed it .
The Complete Source Code can be found here ,Feedback and remarks are always welcome

Acknowledgments : This project was inspired by: