lundi 20 avril 2015

calculating average in Spark streaming not working : issue w/ updateStateByKey and instantiating class

I want to take a stock ticker and value entered via Linux command's nc in a streaming context, and return an average every minute or so.

Values are entered in the format (String, Double), i.e. USD 20. My design is, I want to get these values (1 string and value per line) and instantiate a "Visit" class with them. Then use this class (as well as "VisitState" class to count) to use for my updateStateByKey streaming method.

Here is what i ve tried:

    // Create a ReceiverInputDStream on target ip:port and count the
    // words in input stream of \n delimited test (eg. generated by 'nc')
    val  lines:ReceiverInputDStream = ssc.socketTextStream(args(0), args(1).toInt)

    val words:RDD[String] = lines.flatMap(_.split(" "))  
    val wordDstream :DStream[(String, Double)] = words.map(x => (x(1).toString, x(2).toDouble))

    // Update function that will compute state after each dstream ingestion
    val update = (vals: Seq[Double], state: Option[VisitState]) => {
       val prev = state.getOrElse(VisitState())
       val currentCount = prev.count + vals.size
       val newAvg = (prev.average * prev.count + vals.sum)/currentCount
      Some(new VisitState(currentCount, newAvg))

    }
  val state:DStream[(String, VisitState)] = wordDstream.updateStateByKey[VisitState](update)

and here is my classes:

case class Visit(label: String, marketValue: Int)
  extends Serializable 
{
  private var id : String = ""
  private var value : Int = 0
  this.id = label.toString
  this.value = marketValue

  override def toString():String = {label+s", value=$marketValue"} 
}

and

case class VisitState(count: Double = 0L, average: Double = 0L) {

}

I have 2 issues:

  • compile error: value updateStateByKey is not a member of org.apache.spark.streaming.dstream.DStream[Visit] [error] val state:DStream[(String, VisitState)] = wordDstream.updateStateByKeyVisit

  • i dont know how to instantiate my class w/ the values in Scala, really. I m not sure how.

any help ? cheers, Matt

Aucun commentaire:

Enregistrer un commentaire