spark-wordcount


在本地模式运行spark wordcount,需要引入spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar  需要引入SparkContext._ 否则reduceByKey无法使用

package com.spark
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object wordcount {
  def main(args: Array[String]) {
    val sc = new SparkContext("local", "Simple App", "E:\\00_spark_scala\\spark\\spark-0.9.0-incubating-bin-hadoop1",
    							null)
    							//List("lib/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar")
    val logFile = sc.textFile("hdfs://namenode:9000/user/input/input.txt") // Should be some file on your system
    val file = sc.textFile("D:\\test.txt")
    val counts = logFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
    println(counts)
    counts.saveAsTextFile("D:\\output\\20140404")
  }
}
input:

keyword1
keyword3
keyword2

cosole output:

log4j:WARN No appenders could be found for logger (akka.event.slf4j.Slf4jLogger).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
14/04/04 16:13:31 INFO SparkEnv: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
14/04/04 16:13:31 INFO SparkEnv: Registering BlockManagerMaster
14/04/04 16:13:31 INFO DiskBlockManager: Created local directory at C:\Users\WENBO_~1\AppData\Local\Temp\spark-local-20140404161331-aac1
14/04/04 16:13:31 INFO MemoryStore: MemoryStore started with capacity 1068.8 MB.
14/04/04 16:13:31 INFO ConnectionManager: Bound socket to port 49790 with id = ConnectionManagerId(XA-NA18818395.allyes.group,49790)
14/04/04 16:13:31 INFO BlockManagerMaster: Trying to register BlockManager
14/04/04 16:13:31 INFO BlockManagerMasterActor$BlockManagerInfo: Registering block manager XA-NA18818395.allyes.group:49790 with 1068.8 MB RAM
14/04/04 16:13:31 INFO BlockManagerMaster: Registered BlockManager
14/04/04 16:13:31 INFO HttpServer: Starting HTTP Server
14/04/04 16:13:31 INFO HttpBroadcast: Broadcast server started at http://10.200.33.176:49791
14/04/04 16:13:31 INFO SparkEnv: Registering MapOutputTracker
14/04/04 16:13:31 INFO HttpFileServer: HTTP File server directory is C:\Users\WENBO_~1\AppData\Local\Temp\spark-19614fe9-9131-4382-8491-8725dd29bfdf
14/04/04 16:13:31 INFO HttpServer: Starting HTTP Server
14/04/04 16:13:31 INFO SparkUI: Started Spark Web UI at http://XA-NA18818395.allyes.group:4040
14/04/04 16:13:31 INFO MemoryStore: ensureFreeSpace(32960) called with curMem=0, maxMem=1120744243
14/04/04 16:13:31 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 32.2 KB, free 1068.8 MB)
14/04/04 16:13:31 INFO MemoryStore: ensureFreeSpace(32960) called with curMem=32960, maxMem=1120744243
14/04/04 16:13:31 INFO MemoryStore: Block broadcast_1 stored as values to memory (estimated size 32.2 KB, free 1068.8 MB)
14/04/04 16:13:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/04/04 16:13:32 WARN LoadSnappy: Snappy native library not loaded
14/04/04 16:13:32 INFO FileInputFormat: Total input paths to process : 1
MapPartitionsRDD[8] at reduceByKey at wordcount.scala:12
14/04/04 16:13:32 INFO SparkContext: Starting job: saveAsTextFile at wordcount.scala:14
14/04/04 16:13:32 INFO DAGScheduler: Registering RDD 6 (reduceByKey at wordcount.scala:12)
14/04/04 16:13:32 INFO DAGScheduler: Got job 0 (saveAsTextFile at wordcount.scala:14) with 1 output partitions (allowLocal=false)
14/04/04 16:13:32 INFO DAGScheduler: Final stage: Stage 0 (saveAsTextFile at wordcount.scala:14)
14/04/04 16:13:32 INFO DAGScheduler: Parents of final stage: List(Stage 1)
14/04/04 16:13:32 INFO DAGScheduler: Missing parents: List(Stage 1)
14/04/04 16:13:32 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[6] at reduceByKey at wordcount.scala:12), which has no missing parents
14/04/04 16:13:32 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[6] at reduceByKey at wordcount.scala:12)
14/04/04 16:13:32 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
14/04/04 16:13:32 INFO TaskSetManager: Starting task 1.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
14/04/04 16:13:32 INFO TaskSetManager: Serialized task 1.0:0 as 1919 bytes in 4 ms
14/04/04 16:13:32 INFO Executor: Running task ID 0
14/04/04 16:13:32 INFO BlockManager: Found block broadcast_0 locally
14/04/04 16:13:32 INFO HadoopRDD: Input split: hdfs://namenode:9000/user/input/input.txt:0+27
14/04/04 16:13:32 INFO Executor: Serialized size of result for 0 is 762
14/04/04 16:13:32 INFO Executor: Sending result for 0 directly to driver
14/04/04 16:13:32 INFO Executor: Finished task ID 0
14/04/04 16:13:32 INFO TaskSetManager: Finished TID 0 in 188 ms on localhost (progress: 0/1)
14/04/04 16:13:32 INFO TaskSchedulerImpl: Remove TaskSet 1.0 from pool 
14/04/04 16:13:32 INFO DAGScheduler: Completed ShuffleMapTask(1, 0)
14/04/04 16:13:32 INFO DAGScheduler: Stage 1 (reduceByKey at wordcount.scala:12) finished in 0.195 s
14/04/04 16:13:32 INFO DAGScheduler: looking for newly runnable stages
14/04/04 16:13:32 INFO DAGScheduler: running: Set()
14/04/04 16:13:32 INFO DAGScheduler: waiting: Set(Stage 0)
14/04/04 16:13:32 INFO DAGScheduler: failed: Set()
14/04/04 16:13:32 INFO DAGScheduler: Missing parents for Stage 0: List()
14/04/04 16:13:32 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[9] at saveAsTextFile at wordcount.scala:14), which is now runnable
14/04/04 16:13:32 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[9] at saveAsTextFile at wordcount.scala:14)
14/04/04 16:13:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/04/04 16:13:32 INFO TaskSetManager: Starting task 0.0:0 as TID 1 on executor localhost: localhost (PROCESS_LOCAL)
14/04/04 16:13:32 INFO TaskSetManager: Serialized task 0.0:0 as 5336 bytes in 0 ms
14/04/04 16:13:32 INFO Executor: Running task ID 1
14/04/04 16:13:32 INFO BlockManager: Found block broadcast_0 locally
14/04/04 16:13:32 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1 non-zero-bytes blocks out of 1 blocks
14/04/04 16:13:32 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote gets in  3 ms
14/04/04 16:13:32 INFO FileOutputCommitter: Saved output of task 'attempt_201404041613_0000_m_000000_1' to file:/D:/output/20140404
14/04/04 16:13:32 INFO SparkHadoopWriter: attempt_201404041613_0000_m_000000_1: Committed
14/04/04 16:13:32 INFO Executor: Serialized size of result for 1 is 817
14/04/04 16:13:32 INFO Executor: Sending result for 1 directly to driver
14/04/04 16:13:32 INFO Executor: Finished task ID 1
14/04/04 16:13:32 INFO TaskSetManager: Finished TID 1 in 65 ms on localhost (progress: 0/1)
14/04/04 16:13:32 INFO TaskSchedulerImpl: Remove TaskSet 0.0 from pool 
14/04/04 16:13:32 INFO DAGScheduler: Completed ResultTask(0, 0)
14/04/04 16:13:32 INFO DAGScheduler: Stage 0 (saveAsTextFile at wordcount.scala:14) finished in 0.066 s
14/04/04 16:13:32 INFO SparkContext: Job finished: saveAsTextFile at wordcount.scala:14, took 0.358104066 s

part-00000:

(keyword1,1)
(keyword3,1)
(keyword2,1)


run spark-shell as distributed mode :  

  • spark://hostname:7077 ./spark-shell
这里的hostname从http://namenode:8080/ 页面去查看

相关内容