Welcome toVigges Developer Community-Open, Learning,Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.2k views
in Technique[技术] by (71.8m points)

apache kafka - How to write Spark Streaming output to HDFS without overwriting

After some processing I have a DStream[String , ArrayList[String]] , so when I am writing it to hdfs using saveAsTextFile and after every batch it overwrites the data , so how to write new result by appending to previous results

output.foreachRDD(r => {
  r.saveAsTextFile(path)
})

Edit :: If anyone could help me in converting the output to avro format and then writing to HDFS with appending

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

saveAsTextFile does not support append. If called with a fixed filename, it will overwrite it every time. We could do saveAsTextFile(path+timestamp) to save to a new file every time. That's the basic functionality of DStream.saveAsTextFiles(path)

An easily accessible format that supports append is Parquet. We first transform our data RDD to a DataFrame or Dataset and then we can benefit from the write support offered on top of that abstraction.

case class DataStructure(field1,..., fieldn)

... streaming setup, dstream declaration, ...

val structuredOutput = outputDStream.map(record => mapFunctionRecordToDataStructure)
structuredOutput.foreachRDD(rdd => 
  import sparkSession.implicits._
  val df = rdd.toDF()
  df.write.format("parquet").mode("append").save(s"$workDir/$targetFile")

})

Note that appending to Parquet files gets more expensive over time, so rotating the target file from time to time is still a requirement.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to Vigges Developer Community for programmer and developer-Open, Learning and Share
...