SparkStreaming-Kafka通过指定偏移量获取数据
1 数据源
310999003001, 3109990030010220140820141230
SparkStreaming-Kafka通过指定偏移量获取数据
1.数据源
'310999003001', '3109990030010220140820141230292','00000000','','2017-08-20 14:09:35','0',255,'SN', 0.00,'4','','310999','310999003001','02','','','2','','','2017-08-20 14:12:30','2017-08-20 14:16:13',0,0,'2017-08-21 18:50:05','','',' '
'310999003102', '3109990031020220140820141230266','粤BT96V3','','2017-08-20 14:09:35','0',21,'NS', 0.00,'2','','310999','310999003102','02','','','2','','','2017-08-20 14:12:30','2017-08-20 14:16:13',0,0,'2017-08-21 18:50:05','','',' '
2.生产者
import java.util.Properties import com.google.gson.{Gson, GsonBuilder} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * Date 2022/11/8 9:49 */ object KafkaEventProducer { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("KafkaEventProducer").setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") val topic = "ly_test" val props = new Properties() props.put("bootstrap.servers","node01:9092,node02:9092,node03:9092") props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer") props.put("acks","all") // props.put("security.protocol","SASL_PLAINTEXT") // props.put("sasl.mechanism","PLAIN") // props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='sjf' password='123sjf';") val kafkaProducer = new KafkaProducer[String,String](props) val srcRDD: RDD[String] = sc.textFile("file:///F:\\work\\sun\\lywork\\hbaseoper\\datas\\kafkaproducerdata.txt") val records: Array[Array[String]] = srcRDD.filter(!_.startsWith(";")).map(_.split(",")).collect() //对数据进行预处理形成json形式 for(record<-records){ val trafficInfo = new TrafficInfo(record(0),record(2),record(4),record(6),record(13)) // 不能用new Gson() 会出现 \u0027 // val trafficInfoJson: String = new Gson().toJson(trafficInfo) //使用Gson gson = new Gson(),进行对象转化json格式时,单引号会被转换成u0027代码。使用以下方法进行替换 val gson: Gson = new GsonBuilder().disableHtmlEscaping().create() val trafficInfoJson: String = gson.toJson(trafficInfo) kafkaProducer.send(new ProducerRecord[String,String](topic,trafficInfoJson)) println("Message Sent:"+trafficInfoJson) Thread.sleep(2000) } sc.stop() kafkaProducer.flush() kafkaProducer.close() } //相机编号 //车牌号 //时间 //速度 //车道编号 case class TrafficInfo(camer_id:String,car_id:String,event_time:String,car_speed:String,car_code:String) }
3.消费者获取指定偏移量
import java.text.SimpleDateFormat import java.util import java.util.Date import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} /** * Date 2022/11/5 16:38 */ /** * 通过偏移量获取数据 */ object AttainDataFromOffset { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("AttainDataFromOffset").setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") val ssc = new StreamingContext(sc,Seconds(5)) val kafkaParams: Map[String, Object] = Map[String, Object]( "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "ly", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) // kafka 带有账号密码sasl协议的认证 // "security.protocol" -> "SASL_PLAINTEXT", // "sasl.mechanism" -> "PLAIN", // "sasl.jaas.config" -> "org.apache.kafka.common.security.plain.PlainLoginModule required username='sjf' password='123sjf';" ) val topics = Array("ly_test") val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaParams) ) val res: DStream[(String, String, Int, Long)] = stream.map(recoed => { val key: String = recoed.key() val value: String = recoed.value() val partionId: Int = recoed.partition() val offset: Long = recoed.offset() (key, value, partionId, offset) //println(key+"\t"+value+"\t"+partionId+"\t"+offset) }) // 指定偏移量 val offsetRanges = Array( // topic, partition, inclusive starting offset, exclusive ending offset OffsetRange("lawyee_test", 0, 1L, 10L) ) // 获取指定偏移量的数据 import scala.collection.JavaConverters._ val jkafkaParams: util.Map[String, Object] = kafkaParams.asJava val offsetRDD: RDD[ConsumerRecord[String, String]] = KafkaUtils.createRDD[String,String]( sc, jkafkaParams, offsetRanges, LocationStrategies.PreferConsistent ) val resRDD: RDD[(String, String, Int, Long,String,TimestampType)] = offsetRDD.map(recoed => { val key: String = recoed.key() val value: String = recoed.value() val partionId: Int = recoed.partition() val offset: Long = recoed.offset() var time: Long = recoed.timestamp() val timeStr = timeStampToDate(time) val timestampType: TimestampType = recoed.timestampType() (key, value, partionId, offset,timeStr,timestampType) //println(key+"\t"+value+"\t"+partionId+"\t"+offset) }) resRDD.foreach(println(_)) res.print() ssc.start() ssc.awaitTermination() } // 时间格式时间 转换为字符串时间 def dateToString(date:Date): String ={ val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val strDate: String = simpleDateFormat.format(date) strDate } // 字符串时间转换为时间格式时间 def strToDate(str:String):Date = { val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val date: Date = simpleDateFormat.parse(str) date } // 时间戳转化为字符串时间 def timeStampToDate(timeStamp:Long): String ={ val date = new Date(timeStamp) val strDate: String = dateToString(date) strDate } //字符串时间转化为时间戳 def dateToTimeStamp(strDate:String): Long ={ val date: Date = strToDate(strDate) val timeStamp: Long = date.getTime timeStamp } }
以上就是SparkStreaming-Kafka通过指定偏移量获取数据的详细内容,更多关于SparkStreaming Kafka获取数据的资料请关注好代码网其它相关文章!