Welcome toVigges Developer Community-Open, Learning,Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.1k views
in Technique[技术] by (71.8m points)

apache spark - How does Structured Streaming ensure exactly-once writing semantics for file sinks?

I am writing a storage writer for spark structured streaming which will partition the given dataframe and write to a different blob store account. The spark documentation says the it ensures exactly once semantics for file sinks but also says that the exactly once semantics are only possible if the source is re-playable and the sink is idempotent.

  1. Is the blob store an idempotent sink if I write in parquet format?

  2. Also how will the behavior change if I am doing streamingDF.writestream.foreachbatch(...writing the DF here...).start()? Will it still guarantee exactly once semantics?

Possible duplicate : How to get Kafka offsets for structured query for manual and reliable offset management?

Update#1 : Something like -

output
      .writeStream
      .foreachBatch((df: DataFrame, _: Long) => {
        path = storagePaths(r.nextInt(3))

        df.persist()
        df.write.parquet(path)
        df.unpersist()
      })
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Micro-Batch Stream Processing

I assume that the question is about Micro-Batch Stream Processing (not Continuous Stream Processing).

Exactly once semantics are guaranteed based on available and committed offsets internal registries (for the current stream execution, aka runId) as well as regular checkpoints (to persist processing state across restarts).

exactly once semantics are only possible if the source is re-playable and the sink is idempotent.

It is possible that whatever has already been processed but not recorded properly internally (see below) can be re-processed:

  • That means that all streaming sources in a streaming query should be re-playable to allow for polling for data that has once been requested.

  • That also means that the sink should be idempotent so the data that has been processed successfully and added to the sink may be added again because a failure happened just before Structured Streaming managed to record the data (offsets) as successfully processed (in the checkpoint)

Internals

Before the available data (by offset) of any of the streaming source or reader is processed, MicroBatchExecution commits the offsets to Write-Ahead Log (WAL) and prints out the following INFO message to the logs:

Committed offsets for batch [currentBatchId]. Metadata [offsetSeqMetadata]

A streaming query (a micro-batch) is executed only when there is new data available (based on offsets) or the last execution requires another micro-batch for state management.

In addBatch phase, MicroBatchExecution requests the one and only Sink or StreamWriteSupport to process the available data.

Once a micro-batch finishes successfully the MicroBatchExecution commits the available offsets to commits checkpoint and the offsets are considered processed already.

MicroBatchExecution prints out the following DEBUG message to the logs:

Completed batch [currentBatchId]


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to Vigges Developer Community for programmer and developer-Open, Learning and Share
...