Sparksteaming实时接收网络端口数据和hdfs做wordcount
一、POM配置
因为使用windows的IDEA连接虚拟机中的Spark,所有要配置一下依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.imooc.spark</groupId>
<artifactId>sparktrain</artifactId>
<version>1.0</version>
<inceptionYear>2008</inceptionYear>
<properties>
<scala.version>2.11.4</scala.version>
<kafka.version>1.0.0</kafka.version>
<spark.version>2.4.0</spark.version>
<hadoop.version>2.6.1</hadoop.version>
<hbase.version>1.2.6</hbase.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.17.Final</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.31</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.5</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>
二、实时接收网络数据
- 1.代码
package Sparkstreaming
import java.sql.DriverManager
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object NetWorkCount {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("NerWorkCount").setMaster("local[2]")
val ssc =new StreamingContext(conf,Seconds(5))
val lines=ssc.socketTextStream("192.168.116.10",9999)
val result=lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//System.setProperty("hadoop.home.dir","E:\\MVN\\hadoop-common-2.2.0-bin-master")
//相当于这个写法 reduceByKey((x,y) => x+y)当找到key相同的两条记录时会对其value(分别记为x,y)做(x,y) => x+y
result.print()
result.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createconnection()
partitionOfRecords.foreach(record =>{
val sql="insert into wordcount values('"+record._1+"',"+record._2+")"
connection.createStatement().execute(sql)
}
)
connection.close()
}
}
ssc.start()
ssc.awaitTermination()
//awaitTermination用于等待子线程结束,再继续执行下面的代码
}
def createconnection()={
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://192.168.116.10:3306/test","root","123456")
}
}
- 2.测试
在虚拟机中开启新开一个窗口,输入nc -lk 6789
然后运行IDEA的spark源码,随便输入几个单词,可以发现IDEA已经显示出来了。
- 3.报错处理。
在Windows运行的时候,可能会报错 Failed to locate the winutils binary in the hadoop binary path,可以到GitHub下载整个bin目录,然后修改本机的环境变量。在cmd中测试一下输入hadoop看看环境是否设置成功。
三、实时接收本地hdfs数据
1.代码
package Sparkstreaming
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object Filewordcount {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("Filewordcont").setMaster("local")
val ssc=new StreamingContext(conf,Seconds(5))
val lines=ssc.textFileStream("hdfs://192.168.116.10:9000/sqoop/hdfs/")
val result=lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
四、做名单过滤
- 1.代码
package Sparkstreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Filtername {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Filewordcont").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
//名单过滤之创建名单
val names=List("good","better")
val namesRDD=ssc.sparkContext.parallelize(names).map((_,true))
val lines = ssc.socketTextStream("192.168.116.10", 6789)
val result = lines.map(x=>(x.split(",")(1),x)).transform(rdd=>rdd.leftOuterJoin(namesRDD)).
filter(x=>x._2._2.getOrElse("good")!="good").map(x=>x._2._1)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
- 2.测试
在控制台输入2017,now 2018,good可以发现只有2018,good打印出来而已