org # 18:package org.apache.spark.broadcast apache spark broadcast java io java nio ByteBuffer java util zip Adler32 scala collection JavaConverters scala reflect ClassTag scala util Random org apache spark org apache spark internal Logging org apache spark io CompressionCodec org apache spark serializer Serializer org apache spark storage org apache spark util Utils org apache spark util io ChunkedByteBuffer ChunkedByteBufferOutputStream spark # 57:private[spark] class TorrentBroadcast[ TorrentBroadcast T ClassTag obj T id Long Broadcast T id Logging Serializable transient _value T readBroadcastBlock transient compressionCodec Option CompressionCodec transient blockSize Long setConf # 73: private def setConf(conf: SparkConf) { conf SparkConf compressionCodec conf getBoolean Some CompressionCodec createCodec conf None blockSize conf getSizeAsKb toInt checksumEnabled conf getBoolean setConf # 86: setConf(SparkEnv.get.conf) SparkEnv get conf broadcastId BroadcastBlockId id numBlocks Int writeBlocks obj checksumEnabled Boolean checksums Array Int getValue # 98: override protected def getValue() = { _value calcChecksum # 102: private def calcChecksum(block: ByteBuffer): Int = { block ByteBuffer Int adler Adler32 block hasArray adler update block array block arrayOffset block position block limit block position bytes Array Byte block remaining block duplicate get bytes adler update bytes adler getValue toInt writeBlocks # 121: private def writeBlocks(value: T): Int = { value T Int StorageLevel blockManager SparkEnv get blockManager blockManager putSingle broadcastId value MEMORY_AND_DISK tellMaster SparkException broadcastId blocks TorrentBroadcast blockifyObject value blockSize SparkEnv get serializer compressionCodec checksumEnabled checksums Array Int blocks length blocks zipWithIndex foreach block i checksumEnabled checksums i calcChecksum block pieceId BroadcastBlockId id i bytes ChunkedByteBuffer block duplicate blockManager putBytes pieceId bytes MEMORY_AND_DISK_SER tellMaster SparkException pieceId broadcastId blocks length readBlocks # 148: private def readBlocks(): Array[BlockData] = { Array BlockData blocks Array BlockData numBlocks bm SparkEnv get blockManager pid Random shuffle Seq range numBlocks pieceId BroadcastBlockId id pid logDebug pieceId broadcastId bm getLocalBytes pieceId Some block blocks pid block releaseLock pieceId None bm getRemoteBytes pieceId Some b checksumEnabled sum calcChecksum b chunks sum checksums pid SparkException pieceId broadcastId sum bm putBytes pieceId b StorageLevel MEMORY_AND_DISK_SER tellMaster SparkException pieceId broadcastId blocks pid ByteBufferBlockData b None SparkException pieceId broadcastId blocks doUnpersist # 192: override protected def doUnpersist(blocking: Boolean) { blocking Boolean TorrentBroadcast unpersist id removeFromDriver blocking doDestroy # 200: override protected def doDestroy(blocking: Boolean) { blocking Boolean TorrentBroadcast unpersist id removeFromDriver blocking writeObject # 205: private def writeObject(out: out ObjectOutputStream Unit Utils tryOrIOException assertValid out defaultWriteObject readBroadcastBlock # 210: private def readBroadcastBlock(): T Utils tryOrIOException TorrentBroadcast synchronized setConf SparkEnv get conf blockManager SparkEnv get blockManager blockManager getLocalValues broadcastId Some blockResult blockResult data hasNext x blockResult data next asInstanceOf T releaseLock broadcastId x SparkException broadcastId None logInfo id startTimeMs System currentTimeMillis blocks readBlocks logInfo id Utils getUsedTimeMs startTimeMs obj TorrentBroadcast unBlockifyObject T blocks map toInputStream SparkEnv get serializer compressionCodec storageLevel StorageLevel MEMORY_AND_DISK blockManager putSingle broadcastId obj storageLevel tellMaster SparkException broadcastId obj blocks foreach dispose releaseLock # 250: private def releaseLock(blockId: BlockId): Unit = { blockId BlockId Unit blockManager SparkEnv get blockManager Option TaskContext get Some taskContext taskContext addTaskCompletionListener blockManager releaseLock blockId None blockManager releaseLock blockId TorrentBroadcast # 268:private object TorrentBroadcast Logging blockifyObject # 270: def blockifyObject[T: ClassTag]( T ClassTag obj T blockSize Int serializer Serializer compressionCodec Option CompressionCodec Array ByteBuffer cbbos ChunkedByteBufferOutputStream blockSize ByteBuffer allocate out compressionCodec map c c compressedOutputStream cbbos getOrElse cbbos ser serializer newInstance serOut ser serializeStream out Utils tryWithSafeFinally serOut writeObject T obj serOut close cbbos toChunkedByteBuffer getChunks unBlockifyObject # 287: def unBlockifyObject[T: ClassTag]( T ClassTag blocks Array InputStream serializer Serializer compressionCodec Option CompressionCodec T require blocks nonEmpty is SequenceInputStream blocks iterator asJavaEnumeration in InputStream compressionCodec map c c compressedInputStream is getOrElse is ser serializer newInstance serIn ser deserializeStream in obj Utils tryWithSafeFinally serIn readObject T serIn close obj unpersist # 308: def unpersist(id: id Long removeFromDriver Boolean blocking Boolean Unit logDebug id SparkEnv get blockManager master removeBroadcast id removeFromDriver blocking unary_