前端技术
HTML
CSS
Javascript
前端框架和UI库
VUE
ReactJS
AngularJS
JQuery
NodeJS
JSON
Element-UI
Bootstrap
Material UI
服务端和客户端
Java
Python
PHP
Golang
Scala
Kotlin
Groovy
Ruby
Lua
.net
c#
c++
后端WEB和工程框架
SpringBoot
SpringCloud
Struts2
MyBatis
Hibernate
Tornado
Beego
Go-Spring
Go Gin
Go Iris
Dubbo
HessianRPC
Maven
Gradle
数据库
MySQL
Oracle
Mongo
中间件与web容器
Redis
MemCache
Etcd
Cassandra
Kafka
RabbitMQ
RocketMQ
ActiveMQ
Nacos
Consul
Tomcat
Nginx
Netty
大数据技术
Hive
Impala
ClickHouse
DorisDB
Greenplum
PostgreSQL
HBase
Kylin
Hadoop
Apache Pig
ZooKeeper
SeaTunnel
Sqoop
Datax
Flink
Spark
Mahout
数据搜索与日志
ElasticSearch
Apache Lucene
Apache Solr
Kibana
Logstash
数据可视化与OLAP
Apache Atlas
Superset
Saiku
Tesseract
系统与容器
Linux
Shell
Docker
Kubernetes
[Python Elasticsearch...]的搜索结果
这里是文章列表。热门标签的颜色随机变换,标签颜色没有特殊含义。
点击某个标签可搜索标签相关的文章。
点击某个标签可搜索标签相关的文章。
转载文章
... Streaming应用程序就会去Kafka中Pull数据过来进行计算和消费,并把计算后的数据放入到持久化系统中(MySQL) 广告点击系统实时分析的意义:因为可以在线实时的看见广告的投放效果,就为广告的更大规模的投入和调整打下了坚实的基础,从而为公司带来最大化的经济回报。 核心需求: 1、实时黑名单动态过滤出有效的用户广告点击行为:因为黑名单用户可能随时出现,所以需要动态更新; 2、在线计算广告点击流量; 3、Top3热门广告; 4、每个广告流量趋势; 5、广告点击用户的区域分布分析 6、最近一分钟的广告点击量; 7、整个广告点击Spark Streaming处理程序724小时运行; 数据格式: 时间、用户、广告、城市等 技术细节: 在线计算用户点击的次数分析,屏蔽IP等; 使用updateStateByKey或者mapWithState进行不同地区广告点击排名的计算; Spark Streaming+Spark SQL+Spark Core等综合分析数据; 使用Window类型的操作; 高可用和性能调优等等; 流量趋势,一般会结合DB等; Spark Core / /package com.tom.spark.SparkApps.sparkstreaming;import java.util.Date;import java.util.HashMap;import java.util.Map;import java.util.Properties;import java.util.Random;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;/ 数据生成代码,Kafka Producer产生数据/public class MockAdClickedStat {/ @param args/public static void main(String[] args) {final Random random = new Random();final String[] provinces = new String[]{"Guangdong", "Zhejiang", "Jiangsu", "Fujian"};final Map<String, String[]> cities = new HashMap<String, String[]>();cities.put("Guangdong", new String[]{"Guangzhou", "Shenzhen", "Dongguan"});cities.put("Zhejiang", new String[]{"Hangzhou", "Wenzhou", "Ningbo"});cities.put("Jiangsu", new String[]{"Nanjing", "Suzhou", "Wuxi"});cities.put("Fujian", new String[]{"Fuzhou", "Xiamen", "Sanming"});final String[] ips = new String[] {"192.168.112.240","192.168.112.239","192.168.112.245","192.168.112.246","192.168.112.247","192.168.112.248","192.168.112.249","192.168.112.250","192.168.112.251","192.168.112.252","192.168.112.253","192.168.112.254",};/ Kafka相关的基本配置信息/Properties kafkaConf = new Properties();kafkaConf.put("serializer.class", "kafka.serializer.StringEncoder");kafkaConf.put("metadeta.broker.list", "Master:9092,Worker1:9092,Worker2:9092");ProducerConfig producerConfig = new ProducerConfig(kafkaConf);final Producer<Integer, String> producer = new Producer<Integer, String>(producerConfig);new Thread(new Runnable() {public void run() {while(true) {//在线处理广告点击流的基本数据格式:timestamp、ip、userID、adID、province、cityLong timestamp = new Date().getTime();String ip = ips[random.nextInt(12)]; //可以采用网络上免费提供的ip库int userID = random.nextInt(10000);int adID = random.nextInt(100);String province = provinces[random.nextInt(4)];String city = cities.get(province)[random.nextInt(3)];String clickedAd = timestamp + "\t" + ip + "\t" + userID + "\t" + adID + "\t" + province + "\t" + city;producer.send(new KeyedMessage<Integer, String>("AdClicked", clickedAd));try {Thread.sleep(50);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} }} }).start();} } package com.tom.spark.SparkApps.sparkstreaming;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.util.ArrayList;import java.util.Arrays;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Set;import java.util.concurrent.LinkedBlockingQueue;import kafka.serializer.StringDecoder;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.sql.DataFrame;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.hive.HiveContext;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructType;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaPairInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;import org.apache.spark.streaming.kafka.KafkaUtils;import com.google.common.base.Optional;import scala.Tuple2;/ 数据处理,Kafka消费者/public class AdClickedStreamingStats {/ @param args/public static void main(String[] args) {// TODO Auto-generated method stub//好处:1、checkpoint 2、工厂final SparkConf conf = new SparkConf().setAppName("SparkStreamingOnKafkaDirect").setMaster("hdfs://Master:7077/");final String checkpointDirectory = "hdfs://Master:9000/library/SparkStreaming/CheckPoint_Data";JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {public JavaStreamingContext create() {// TODO Auto-generated method stubreturn createContext(checkpointDirectory, conf);} };/ 可以从失败中恢复Driver,不过还需要指定Driver这个进程运行在Cluster,并且在提交应用程序的时候制定--supervise;/JavaStreamingContext javassc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);/ 第三步:创建Spark Streaming输入数据来源input Stream: 1、数据输入来源可以基于File、HDFS、Flume、Kafka、Socket等 2、在这里我们指定数据来源于网络Socket端口,Spark Streaming连接上该端口并在运行的时候一直监听该端口的数据 (当然该端口服务首先必须存在),并且在后续会根据业务需要不断有数据产生(当然对于Spark Streaming 应用程序的运行而言,有无数据其处理流程都是一样的) 3、如果经常在每间隔5秒钟没有数据的话不断启动空的Job其实会造成调度资源的浪费,因为并没有数据需要发生计算;所以 实际的企业级生成环境的代码在具体提交Job前会判断是否有数据,如果没有的话就不再提交Job;///创建Kafka元数据来让Spark Streaming这个Kafka Consumer利用Map<String, String> kafkaParameters = new HashMap<String, String>();kafkaParameters.put("metadata.broker.list", "Master:9092,Worker1:9092,Worker2:9092");Set<String> topics = new HashSet<String>();topics.add("SparkStreamingDirected");JavaPairInputDStream<String, String> adClickedStreaming = KafkaUtils.createDirectStream(javassc, String.class, String.class, StringDecoder.class, StringDecoder.class,kafkaParameters, topics);/因为要对黑名单进行过滤,而数据是在RDD中的,所以必然使用transform这个函数; 但是在这里我们必须使用transformToPair,原因是读取进来的Kafka的数据是Pair<String,String>类型, 另一个原因是过滤后的数据要进行进一步处理,所以必须是读进的Kafka数据的原始类型 在此再次说明,每个Batch Duration中实际上讲输入的数据就是被一个且仅被一个RDD封装的,你可以有多个 InputDStream,但其实在产生job的时候,这些不同的InputDStream在Batch Duration中就相当于Spark基于HDFS 数据操作的不同文件来源而已罢了。/JavaPairDStream<String, String> filteredadClickedStreaming = adClickedStreaming.transformToPair(new Function<JavaPairRDD<String,String>, JavaPairRDD<String,String>>() {public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {/ 在线黑名单过滤思路步骤: 1、从数据库中获取黑名单转换成RDD,即新的RDD实例封装黑名单数据; 2、然后把代表黑名单的RDD的实例和Batch Duration产生的RDD进行Join操作, 准确的说是进行leftOuterJoin操作,也就是说使用Batch Duration产生的RDD和代表黑名单的RDD实例进行 leftOuterJoin操作,如果两者都有内容的话,就会是true,否则的话就是false 我们要留下的是leftOuterJoin结果为false; /final List<String> blackListNames = new ArrayList<String>();JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();jdbcWrapper.doQuery("SELECT FROM blacklisttable", null, new ExecuteCallBack() {public void resultCallBack(ResultSet result) throws Exception {while(result.next()){blackListNames.add(result.getString(1));} }});List<Tuple2<String, Boolean>> blackListTuple = new ArrayList<Tuple2<String,Boolean>>();for(String name : blackListNames) {blackListTuple.add(new Tuple2<String, Boolean>(name, true));}List<Tuple2<String, Boolean>> blacklistFromListDB = blackListTuple; //数据来自于查询的黑名单表并且映射成为<String, Boolean>JavaSparkContext jsc = new JavaSparkContext(rdd.context());/ 黑名单的表中只有userID,但是如果要进行join操作的话就必须是Key-Value,所以在这里我们需要 基于数据表中的数据产生Key-Value类型的数据集合/JavaPairRDD<String, Boolean> blackListRDD = jsc.parallelizePairs(blacklistFromListDB);/ 进行操作的时候肯定是基于userID进行join,所以必须把传入的rdd进行mapToPair操作转化成为符合格式的RDD/JavaPairRDD<String, Tuple2<String, String>> rdd2Pair = rdd.mapToPair(new PairFunction<Tuple2<String,String>, String, Tuple2<String, String>>() {public Tuple2<String, Tuple2<String, String>> call(Tuple2<String, String> t) throws Exception {// TODO Auto-generated method stubString userID = t._2.split("\t")[2];return new Tuple2<String, Tuple2<String,String>>(userID, t);} });JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> joined = rdd2Pair.leftOuterJoin(blackListRDD);JavaPairRDD<String, String> result = joined.filter(new Function<Tuple2<String,Tuple2<Tuple2<String,String>,Optional<Boolean>>>, Boolean>() {public Boolean call(Tuple2<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> tuple)throws Exception {// TODO Auto-generated method stubOptional<Boolean> optional = tuple._2._2;if(optional.isPresent() && optional.get()){return false;} else {return true;} }}).mapToPair(new PairFunction<Tuple2<String,Tuple2<Tuple2<String,String>,Optional<Boolean>>>, String, String>() {public Tuple2<String, String> call(Tuple2<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> t)throws Exception {// TODO Auto-generated method stubreturn t._2._1;} });return result;} });//广告点击的基本数据格式:timestamp、ip、userID、adID、province、cityJavaPairDStream<String, Long> pairs = filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {public Tuple2<String, Long> call(Tuple2<String, String> t) throws Exception {String[] splited=t._2.split("\t");String timestamp = splited[0]; //YYYY-MM-DDString ip = splited[1];String userID = splited[2];String adID = splited[3];String province = splited[4];String city = splited[5]; String clickedRecord = timestamp + "_" +ip + "_"+userID+"_"+adID+"_"+province +"_"+city;return new Tuple2<String, Long>(clickedRecord, 1L);} });/ 第4.3步:在单词实例计数为1基础上,统计每个单词在文件中出现的总次数/JavaPairDStream<String, Long> adClickedUsers= pairs.reduceByKey(new Function2<Long, Long, Long>() {public Long call(Long i1, Long i2) throws Exception{return i1 + i2;} });/判断有效的点击,复杂化的采用机器学习训练模型进行在线过滤 简单的根据ip判断1天不超过100次;也可以通过一个batch duration的点击次数判断是否非法广告点击,通过一个batch来判断是不完整的,还需要一天的数据也可以每一个小时来判断。/JavaPairDStream<String, Long> filterClickedBatch = adClickedUsers.filter(new Function<Tuple2<String,Long>, Boolean>() {public Boolean call(Tuple2<String, Long> v1) throws Exception {if (1 < v1._2){//更新一些黑名单的数据库表return false;} else { return true;} }});//filterClickedBatch.print();//写入数据库filterClickedBatch.foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {public Void call(JavaPairRDD<String, Long> rdd) throws Exception {rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {public void call(Iterator<Tuple2<String, Long>> partition) throws Exception {//使用数据库连接池的高效读写数据库的方式将数据写入数据库mysql//例如一次插入 1000条 records,使用insertBatch 或 updateBatch//插入的用户数据信息:userID,adID,clickedCount,time//这里面有一个问题,可能出现两条记录的key是一样的,此时需要更新累加操作List<UserAdClicked> userAdClickedList = new ArrayList<UserAdClicked>();while(partition.hasNext()) {Tuple2<String, Long> record = partition.next();String[] splited = record._1.split("\t");UserAdClicked userClicked = new UserAdClicked();userClicked.setTimestamp(splited[0]);userClicked.setIp(splited[1]);userClicked.setUserID(splited[2]);userClicked.setAdID(splited[3]);userClicked.setProvince(splited[4]);userClicked.setCity(splited[5]);userAdClickedList.add(userClicked);}final List<UserAdClicked> inserting = new ArrayList<UserAdClicked>();final List<UserAdClicked> updating = new ArrayList<UserAdClicked>();JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();//表的字段timestamp、ip、userID、adID、province、city、clickedCountfor(final UserAdClicked clicked : userAdClickedList) {jdbcWrapper.doQuery("SELECT clickedCount FROM adclicked WHERE"+ " timestamp =? AND userID = ? AND adID = ?",new Object[]{clicked.getTimestamp(), clicked.getUserID(),clicked.getAdID()}, new ExecuteCallBack() {public void resultCallBack(ResultSet result) throws Exception {// TODO Auto-generated method stubif(result.next()) {long count = result.getLong(1);clicked.setClickedCount(count);updating.add(clicked);} else {inserting.add(clicked);clicked.setClickedCount(1L);} }});}//表的字段timestamp、ip、userID、adID、province、city、clickedCountList<Object[]> insertParametersList = new ArrayList<Object[]>();for(UserAdClicked insertRecord : inserting) {insertParametersList.add(new Object[] {insertRecord.getTimestamp(),insertRecord.getIp(),insertRecord.getUserID(),insertRecord.getAdID(),insertRecord.getProvince(),insertRecord.getCity(),insertRecord.getClickedCount()});}jdbcWrapper.doBatch("INSERT INTO adclicked VALUES(?, ?, ?, ?, ?, ?, ?)", insertParametersList);//表的字段timestamp、ip、userID、adID、province、city、clickedCountList<Object[]> updateParametersList = new ArrayList<Object[]>();for(UserAdClicked updateRecord : updating) {updateParametersList.add(new Object[] {updateRecord.getTimestamp(),updateRecord.getIp(),updateRecord.getUserID(),updateRecord.getAdID(),updateRecord.getProvince(),updateRecord.getCity(),updateRecord.getClickedCount() + 1});}jdbcWrapper.doBatch("UPDATE adclicked SET clickedCount = ? WHERE"+ " timestamp =? AND ip = ? AND userID = ? AND adID = ? "+ "AND province = ? AND city = ?", updateParametersList);} });return null;} });//再次过滤,从数据库中读取数据过滤黑名单JavaPairDStream<String, Long> blackListBasedOnHistory = filterClickedBatch.filter(new Function<Tuple2<String,Long>, Boolean>() {public Boolean call(Tuple2<String, Long> v1) throws Exception {//广告点击的基本数据格式:timestamp,ip,userID,adID,province,cityString[] splited = v1._1.split("\t"); //提取key值String date =splited[0];String userID =splited[2];String adID =splited[3];//查询一下数据库同一个用户同一个广告id点击量超过50次列入黑名单//接下来 根据date、userID、adID条件去查询用户点击广告的数据表,获得总的点击次数//这个时候基于点击次数判断是否属于黑名单点击int clickedCountTotalToday = 81 ;if (clickedCountTotalToday > 50) {return true;}else {return false ;} }});//map操作,找出用户的idJavaDStream<String> blackListuserIDBasedInBatchOnhistroy =blackListBasedOnHistory.map(new Function<Tuple2<String,Long>, String>() {public String call(Tuple2<String, Long> v1) throws Exception {// TODO Auto-generated method stubreturn v1._1.split("\t")[2];} });//有一个问题,数据可能重复,在一个partition里面重复,这个好办;//但多个partition不能保证一个用户重复,需要对黑名单的整个rdd进行去重操作。//rdd去重了,partition也就去重了,一石二鸟,一箭双雕// 找出了黑名单,下一步就写入黑名单数据库表中JavaDStream<String> blackListUniqueuserBasedInBatchOnhistroy = blackListuserIDBasedInBatchOnhistroy.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {// TODO Auto-generated method stubreturn rdd.distinct();} });// 下一步写入到数据表中blackListUniqueuserBasedInBatchOnhistroy.foreachRDD(new Function<JavaRDD<String>, Void>() {public Void call(JavaRDD<String> rdd) throws Exception {rdd.foreachPartition(new VoidFunction<Iterator<String>>() {public void call(Iterator<String> t) throws Exception {// TODO Auto-generated method stub//插入的用户信息可以只包含:useID//此时直接插入黑名单数据表即可。//写入数据库List<Object[]> blackList = new ArrayList<Object[]>();while(t.hasNext()) {blackList.add(new Object[]{t.next()});}JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();jdbcWrapper.doBatch("INSERT INTO blacklisttable values (?)", blackList);} });return null;} });/广告点击累计动态更新,每个updateStateByKey都会在Batch Duration的时间间隔的基础上进行广告点击次数的更新, 更新之后我们一般都会持久化到外部存储设备上,在这里我们存储到MySQL数据库中/JavaPairDStream<String, Long> updateStateByKeyDSteam = filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {public Tuple2<String, Long> call(Tuple2<String, String> t)throws Exception {String[] splited=t._2.split("\t");String timestamp = splited[0]; //YYYY-MM-DDString ip = splited[1];String userID = splited[2];String adID = splited[3];String province = splited[4];String city = splited[5]; String clickedRecord = timestamp + "_" +ip + "_"+userID+"_"+adID+"_"+province +"_"+city;return new Tuple2<String, Long>(clickedRecord, 1L);} }).updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() {public Optional<Long> call(List<Long> v1, Optional<Long> v2)throws Exception {// v1:当前的Key在当前的Batch Duration中出现的次数的集合,例如{1,1,1,。。。,1}// v2:当前的Key在以前的Batch Duration中积累下来的结果;Long clickedTotalHistory = 0L; if(v2.isPresent()){clickedTotalHistory = v2.get();}for(Long one : v1) {clickedTotalHistory += one;}return Optional.of(clickedTotalHistory);} });updateStateByKeyDSteam.foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {public Void call(JavaPairRDD<String, Long> rdd) throws Exception {rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {public void call(Iterator<Tuple2<String, Long>> partition) throws Exception {//使用数据库连接池的高效读写数据库的方式将数据写入数据库mysql//例如一次插入 1000条 records,使用insertBatch 或 updateBatch//插入的用户数据信息:timestamp、adID、province、city//这里面有一个问题,可能出现两条记录的key是一样的,此时需要更新累加操作List<AdClicked> AdClickedList = new ArrayList<AdClicked>();while(partition.hasNext()) {Tuple2<String, Long> record = partition.next();String[] splited = record._1.split("\t");AdClicked adClicked = new AdClicked();adClicked.setTimestamp(splited[0]);adClicked.setAdID(splited[1]);adClicked.setProvince(splited[2]);adClicked.setCity(splited[3]);adClicked.setClickedCount(record._2);AdClickedList.add(adClicked);}final List<AdClicked> inserting = new ArrayList<AdClicked>();final List<AdClicked> updating = new ArrayList<AdClicked>();JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();//表的字段timestamp、ip、userID、adID、province、city、clickedCountfor(final AdClicked clicked : AdClickedList) {jdbcWrapper.doQuery("SELECT clickedCount FROM adclickedcount WHERE"+ " timestamp = ? AND adID = ? AND province = ? AND city = ?",new Object[]{clicked.getTimestamp(), clicked.getAdID(),clicked.getProvince(), clicked.getCity()}, new ExecuteCallBack() {public void resultCallBack(ResultSet result) throws Exception {// TODO Auto-generated method stubif(result.next()) {long count = result.getLong(1);clicked.setClickedCount(count);updating.add(clicked);} else {inserting.add(clicked);clicked.setClickedCount(1L);} }});}//表的字段timestamp、ip、userID、adID、province、city、clickedCountList<Object[]> insertParametersList = new ArrayList<Object[]>();for(AdClicked insertRecord : inserting) {insertParametersList.add(new Object[] {insertRecord.getTimestamp(),insertRecord.getAdID(),insertRecord.getProvince(),insertRecord.getCity(),insertRecord.getClickedCount()});}jdbcWrapper.doBatch("INSERT INTO adclickedcount VALUES(?, ?, ?, ?, ?)", insertParametersList);//表的字段timestamp、ip、userID、adID、province、city、clickedCountList<Object[]> updateParametersList = new ArrayList<Object[]>();for(AdClicked updateRecord : updating) {updateParametersList.add(new Object[] {updateRecord.getClickedCount(),updateRecord.getTimestamp(),updateRecord.getAdID(),updateRecord.getProvince(),updateRecord.getCity()});}jdbcWrapper.doBatch("UPDATE adclickedcount SET clickedCount = ? WHERE"+ " timestamp =? AND adID = ? AND province = ? AND city = ?", updateParametersList);} });return null;} });/ 对广告点击进行TopN计算,计算出每天每个省份Top5排名的广告 因为我们直接对RDD进行操作,所以使用了transfomr算子;/updateStateByKeyDSteam.transform(new Function<JavaPairRDD<String,Long>, JavaRDD<Row>>() {public JavaRDD<Row> call(JavaPairRDD<String, Long> rdd) throws Exception {JavaRDD<Row> rowRDD = rdd.mapToPair(new PairFunction<Tuple2<String,Long>, String, Long>() {public Tuple2<String, Long> call(Tuple2<String, Long> t)throws Exception {// TODO Auto-generated method stubString[] splited=t._1.split("_");String timestamp = splited[0]; //YYYY-MM-DDString adID = splited[3];String province = splited[4];String clickedRecord = timestamp + "_" + adID + "_" + province;return new Tuple2<String, Long>(clickedRecord, t._2);} }).reduceByKey(new Function2<Long, Long, Long>() {public Long call(Long v1, Long v2) throws Exception {// TODO Auto-generated method stubreturn v1 + v2;} }).map(new Function<Tuple2<String,Long>, Row>() {public Row call(Tuple2<String, Long> v1) throws Exception {// TODO Auto-generated method stubString[] splited=v1._1.split("_");String timestamp = splited[0]; //YYYY-MM-DDString adID = splited[3];String province = splited[4];return RowFactory.create(timestamp, adID, province, v1._2);} });StructType structType = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("timestamp", DataTypes.StringType, true),DataTypes.createStructField("adID", DataTypes.StringType, true),DataTypes.createStructField("province", DataTypes.StringType, true),DataTypes.createStructField("clickedCount", DataTypes.LongType, true)));HiveContext hiveContext = new HiveContext(rdd.context());DataFrame df = hiveContext.createDataFrame(rowRDD, structType);df.registerTempTable("topNTableSource");DataFrame result = hiveContext.sql("SELECT timestamp, adID, province, clickedCount, FROM"+ " (SELECT timestamp, adID, province,clickedCount, "+ "ROW_NUMBER() OVER(PARTITION BY province ORDER BY clickeCount DESC) rank "+ "FROM topNTableSource) subquery "+ "WHERE rank <= 5");return result.toJavaRDD();} }).foreachRDD(new Function<JavaRDD<Row>, Void>() {public Void call(JavaRDD<Row> rdd) throws Exception {// TODO Auto-generated method stubrdd.foreachPartition(new VoidFunction<Iterator<Row>>() {public void call(Iterator<Row> t) throws Exception {// TODO Auto-generated method stubList<AdProvinceTopN> adProvinceTopN = new ArrayList<AdProvinceTopN>();while(t.hasNext()) {Row row = t.next();AdProvinceTopN item = new AdProvinceTopN();item.setTimestamp(row.getString(0));item.setAdID(row.getString(1));item.setProvince(row.getString(2));item.setClickedCount(row.getLong(3));adProvinceTopN.add(item);}// final List<AdProvinceTopN> inserting = new ArrayList<AdProvinceTopN>();// final List<AdProvinceTopN> updating = new ArrayList<AdProvinceTopN>();JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();Set<String> set = new HashSet<String>();for(AdProvinceTopN item: adProvinceTopN){set.add(item.getTimestamp() + "_" + item.getProvince());}//表的字段timestamp、adID、province、clickedCountArrayList<Object[]> deleteParametersList = new ArrayList<Object[]>();for(String deleteRecord : set) {String[] splited = deleteRecord.split("_");deleteParametersList.add(new Object[]{splited[0],splited[1]});}jdbcWrapper.doBatch("DELETE FROM adprovincetopn WHERE timestamp = ? AND province = ?", deleteParametersList);//表的字段timestamp、ip、userID、adID、province、city、clickedCountList<Object[]> insertParametersList = new ArrayList<Object[]>();for(AdProvinceTopN insertRecord : adProvinceTopN) {insertParametersList.add(new Object[] {insertRecord.getClickedCount(),insertRecord.getTimestamp(),insertRecord.getAdID(),insertRecord.getProvince()});}jdbcWrapper.doBatch("INSERT INTO adprovincetopn VALUES (?, ?, ?, ?)", insertParametersList);} });return null;} });/ 计算过去半个小时内广告点击的趋势 广告点击的基本数据格式:timestamp、ip、userID、adID、province、city/filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {public Tuple2<String, Long> call(Tuple2<String, String> t)throws Exception {String splited[] = t._2.split("\t");String adID = splited[3];String time = splited[0]; //Todo:后续需要重构代码实现时间戳和分钟的转换提取。此处需要提取出该广告的点击分钟单位return new Tuple2<String, Long>(time + "_" + adID, 1L);} }).reduceByKeyAndWindow(new Function2<Long, Long, Long>() {public Long call(Long v1, Long v2) throws Exception {// TODO Auto-generated method stubreturn v1 + v2;} }, new Function2<Long, Long, Long>() {public Long call(Long v1, Long v2) throws Exception {// TODO Auto-generated method stubreturn v1 - v2;} }, Durations.minutes(30), Durations.milliseconds(5)).foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {public Void call(JavaPairRDD<String, Long> rdd) throws Exception {// TODO Auto-generated method stubrdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {public void call(Iterator<Tuple2<String, Long>> partition)throws Exception {List<AdTrendStat> adTrend = new ArrayList<AdTrendStat>();// TODO Auto-generated method stubwhile(partition.hasNext()) {Tuple2<String, Long> record = partition.next();String[] splited = record._1.split("_");String time = splited[0];String adID = splited[1];Long clickedCount = record._2;/ 在插入数据到数据库的时候具体需要哪些字段?time、adID、clickedCount; 而我们通过J2EE技术进行趋势绘图的时候肯定是需要年、月、日、时、分这个维度的,所以我们在这里需要 年月日、小时、分钟这些时间维度;/AdTrendStat adTrendStat = new AdTrendStat();adTrendStat.setAdID(adID);adTrendStat.setClickedCount(clickedCount);adTrendStat.set_date(time); //Todo:获取年月日adTrendStat.set_hour(time); //Todo:获取小时adTrendStat.set_minute(time);//Todo:获取分钟adTrend.add(adTrendStat);}final List<AdTrendStat> inserting = new ArrayList<AdTrendStat>();final List<AdTrendStat> updating = new ArrayList<AdTrendStat>();JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();//表的字段timestamp、ip、userID、adID、province、city、clickedCountfor(final AdTrendStat trend : adTrend) {final AdTrendCountHistory adTrendhistory = new AdTrendCountHistory();jdbcWrapper.doQuery("SELECT clickedCount FROM adclickedtrend WHERE"+ " date =? AND hour = ? AND minute = ? AND AdID = ?",new Object[]{trend.get_date(), trend.get_hour(), trend.get_minute(),trend.getAdID()}, new ExecuteCallBack() {public void resultCallBack(ResultSet result) throws Exception {// TODO Auto-generated method stubif(result.next()) {long count = result.getLong(1);adTrendhistory.setClickedCountHistoryLong(count);updating.add(trend);} else { inserting.add(trend);} }});}//表的字段date、hour、minute、adID、clickedCountList<Object[]> insertParametersList = new ArrayList<Object[]>();for(AdTrendStat insertRecord : inserting) {insertParametersList.add(new Object[] {insertRecord.get_date(),insertRecord.get_hour(),insertRecord.get_minute(),insertRecord.getAdID(),insertRecord.getClickedCount()});}jdbcWrapper.doBatch("INSERT INTO adclickedtrend VALUES(?, ?, ?, ?, ?)", insertParametersList);//表的字段date、hour、minute、adID、clickedCountList<Object[]> updateParametersList = new ArrayList<Object[]>();for(AdTrendStat updateRecord : updating) {updateParametersList.add(new Object[] {updateRecord.getClickedCount(),updateRecord.get_date(),updateRecord.get_hour(),updateRecord.get_minute(),updateRecord.getAdID()});}jdbcWrapper.doBatch("UPDATE adclickedtrend SET clickedCount = ? WHERE"+ " date =? AND hour = ? AND minute = ? AND AdID = ?", updateParametersList);} });return null;} });;/ Spark Streaming 执行引擎也就是Driver开始运行,Driver启动的时候是位于一条新的线程中的,当然其内部有消息循环体,用于 接收应用程序本身或者Executor中的消息,/javassc.start();javassc.awaitTermination();javassc.close();}private static JavaStreamingContext createContext(String checkpointDirectory, SparkConf conf) {// If you do not see this printed, that means the StreamingContext has been loaded// from the new checkpointSystem.out.println("Creating new context");// Create the context with a 5 second batch sizeJavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(10));ssc.checkpoint(checkpointDirectory);return ssc;} }class JDBCWrapper {private static JDBCWrapper jdbcInstance = null;private static LinkedBlockingQueue<Connection> dbConnectionPool = new LinkedBlockingQueue<Connection>();static {try {Class.forName("com.mysql.jdbc.Driver");} catch (ClassNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();} }public static JDBCWrapper getJDBCInstance() {if(jdbcInstance == null) {synchronized (JDBCWrapper.class) {if(jdbcInstance == null) {jdbcInstance = new JDBCWrapper();} }}return jdbcInstance; }private JDBCWrapper() {for(int i = 0; i < 10; i++){try {Connection conn = DriverManager.getConnection("jdbc:mysql://Master:3306/sparkstreaming","root", "root");dbConnectionPool.put(conn);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();} } }public synchronized Connection getConnection() {while(0 == dbConnectionPool.size()){try {Thread.sleep(20);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} }return dbConnectionPool.poll();}public int[] doBatch(String sqlText, List<Object[]> paramsList){Connection conn = getConnection();PreparedStatement preparedStatement = null;int[] result = null;try {conn.setAutoCommit(false);preparedStatement = conn.prepareStatement(sqlText);for(Object[] parameters: paramsList) {for(int i = 0; i < parameters.length; i++){preparedStatement.setObject(i + 1, parameters[i]);} preparedStatement.addBatch();}result = preparedStatement.executeBatch();conn.commit();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();} finally {if(preparedStatement != null) {try {preparedStatement.close();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();} }if(conn != null) {try {dbConnectionPool.put(conn);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} }}return result; }public void doQuery(String sqlText, Object[] paramsList, ExecuteCallBack callback){Connection conn = getConnection();PreparedStatement preparedStatement = null;ResultSet result = null;try {preparedStatement = conn.prepareStatement(sqlText);for(int i = 0; i < paramsList.length; i++){preparedStatement.setObject(i + 1, paramsList[i]);} result = preparedStatement.executeQuery();try {callback.resultCallBack(result);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();} } catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();} finally {if(preparedStatement != null) {try {preparedStatement.close();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();} }if(conn != null) {try {dbConnectionPool.put(conn);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} }} }}interface ExecuteCallBack {void resultCallBack(ResultSet result) throws Exception;}class UserAdClicked {private String timestamp;private String ip;private String userID;private String adID;private String province;private String city;private Long clickedCount;public String getTimestamp() {return timestamp;}public void setTimestamp(String timestamp) {this.timestamp = timestamp;}public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getAdID() {return adID;}public void setAdID(String adID) {this.adID = adID;}public String getProvince() {return province;}public void setProvince(String province) {this.province = province;}public String getCity() {return city;}public void setCity(String city) {this.city = city;}public Long getClickedCount() {return clickedCount;}public void setClickedCount(Long clickedCount) {this.clickedCount = clickedCount;} }class AdClicked {private String timestamp;private String adID;private String province;private String city;private Long clickedCount;public String getTimestamp() {return timestamp;}public void setTimestamp(String timestamp) {this.timestamp = timestamp;}public String getAdID() {return adID;}public void setAdID(String adID) {this.adID = adID;}public String getProvince() {return province;}public void setProvince(String province) {this.province = province;}public String getCity() {return city;}public void setCity(String city) {this.city = city;}public Long getClickedCount() {return clickedCount;}public void setClickedCount(Long clickedCount) {this.clickedCount = clickedCount;} }class AdProvinceTopN {private String timestamp;private String adID;private String province;private Long clickedCount;public String getTimestamp() {return timestamp;}public void setTimestamp(String timestamp) {this.timestamp = timestamp;}public String getAdID() {return adID;}public void setAdID(String adID) {this.adID = adID;}public String getProvince() {return province;}public void setProvince(String province) {this.province = province;}public Long getClickedCount() {return clickedCount;}public void setClickedCount(Long clickedCount) {this.clickedCount = clickedCount;} }class AdTrendStat {private String _date;private String _hour;private String _minute;private String adID;private Long clickedCount;public String get_date() {return _date;}public void set_date(String _date) {this._date = _date;}public String get_hour() {return _hour;}public void set_hour(String _hour) {this._hour = _hour;}public String get_minute() {return _minute;}public void set_minute(String _minute) {this._minute = _minute;}public String getAdID() {return adID;}public void setAdID(String adID) {this.adID = adID;}public Long getClickedCount() {return clickedCount;}public void setClickedCount(Long clickedCount) {this.clickedCount = clickedCount;} }class AdTrendCountHistory{private Long clickedCountHistoryLong;public Long getClickedCountHistoryLong() {return clickedCountHistoryLong;}public void setClickedCountHistoryLong(Long clickedCountHistoryLong) {this.clickedCountHistoryLong = clickedCountHistoryLong;} } 本篇文章为转载内容。原文链接:https://blog.csdn.net/tom_8899_li/article/details/71194434。 该文由互联网用户投稿提供,文中观点代表作者本人意见,并不代表本站的立场。 作为信息平台,本站仅提供文章转载服务,并不拥有其所有权,也不对文章内容的真实性、准确性和合法性承担责任。 如发现本文存在侵权、违法、违规或事实不符的情况,请及时联系我们,我们将第一时间进行核实并删除相应内容。
2023-02-14 19:16:35
297
转载
Java
...源中文分词工具,广泛应用于Elasticsearch和Solr中。 Ansj:一个高效的Java分词框架,支持多种分词模式如最大匹配、最小切分等。 Stanford Segmenter:斯坦福大学提供的分词器,基于统计模型和规则,具有较高的准确性。 FudanNLP分词器:复旦大学自然语言处理小组研发的分词系统。 jieba分词:Python社区中流行的开源中文分词库,支持精确模式、全模式、搜索引擎模式等多种分词模式。 LTP(哈工大 Language Technology Platform)分词器:哈尔滨工业大学开发的一套全面的自然语言处理工具包,其中包含高质量的分词模块。 THULAC:由清华大学自然语言处理与社会人文计算实验室推出的分词和词性标注工具。 HanLP:由李航团队开发的自然语言处理库,包含高效准确的分词组件。
2024-01-27 19:37:56
371
admin-tim
JSON
...后端分离架构的普及和API经济的发展,JSON不仅在PHP中,更是在JavaScript、Python、Java等多种编程语言中被广泛应用。例如,在Node.js环境中,JSON与JavaScript无缝对接,极大地简化了数据处理流程。 近期,PHP社区发布了新版本PHP 8.1,对JSON支持进行了进一步优化,引入了新的函数json_serialize以增强序列化功能,并改进了json_decode错误处理机制,使开发者能够更准确地捕获并解决JSON解析问题。同时,PHP官方文档也提供了更多关于如何安全、高效地处理JSON数据的最佳实践指南。 此外,随着RESTful API设计规范的推广,JSON Schema作为一种用于描述JSON数据结构的标准格式,正在逐步成为主流。它允许开发者为JSON数据定义严格的模式约束,从而确保在数据传输过程中满足预设规则,减少因数据格式错误导致的问题。 因此,对于PHP开发者而言,除了掌握基础的JSON编码解码操作之外,了解并紧跟相关领域的最新动态和技术发展,如PHP 8.1对JSON处理的改进以及JSON Schema的应用,无疑将有助于提升开发效率和代码质量,更好地适应现代Web开发的需求。
2023-01-18 13:53:09
461
算法侠
Python
... schedule是Python中的一款开源定时任务库,它允许用户在特定时间点执行预定义的函数或方法。在本文上下文中,通过导入并使用schedule库,开发者可以轻松地创建每日定时任务,只需设定任务执行的时间规则以及要执行的任务函数即可。 cron表达式 , 虽然文章中未直接提到cron表达式,但在定时任务领域,cron表达式是一种用于配置周期性任务执行时间的标准格式。通常在Unix/Linux系统中,cron服务会根据用户的cron配置文件来定期执行相应的命令或脚本。尽管Python的schedule库并未直接采用cron表达式,但其提供的API(如every().day.at())具有类似功能,可设置任务按日、时、分等粒度进行重复执行。 time模块 , 在Python编程语言中,time模块是一个内置的标准库,提供了处理时间和日期相关操作的功能。在本文中,time模块主要用来配合schedule库实现定时任务的执行,通过time.sleep(1)函数使程序暂停一秒,然后检查是否有计划的任务需要执行,以此形成一个循环机制,确保定时任务能在指定时间准确无误地运行。
2023-01-01 19:28:30
351
软件工程师
JSON
在理解了如何使用Python的json模块将JSON数据转换为字典和列表之后,进一步了解JSON在现代编程实践中的应用及其重要性是十分必要的。JSON因其简洁、易于阅读和编写的特点,已成为API接口、Web服务以及数据库传输等场景下首选的数据交换格式。 近期(时效性),GitHub于2022年推出了改进后的GraphQL API,它支持JSON格式的数据交互,允许开发者更高效地查询和获取所需数据,这无疑再次印证了JSON在数据交换领域的主导地位。同时,随着Python 3.9及更高版本对JSON模块功能的持续优化,如添加对datetime对象的原生支持,使得JSON与Python类型之间的转换更为便捷且兼容性更强。 此外,深入探究JSON安全方面的话题也具有现实意义。由于JSON常用于处理用户输入或从外部源获取的数据,因此确保其安全性至关重要。例如,防范JSON注入攻击需要对解析JSON时进行严格的输入验证和清理。而在Python中,合理使用json.loads()方法配合object_hook参数可以实现对潜在恶意内容的有效检测和拦截。 综上所述,掌握Python中JSON的处理不仅限于基础的编码解码操作,还应关注其在实际开发中的应用场景、最新技术动态以及相关的安全问题,以提升代码质量及应用程序的安全防护能力。
2024-03-03 16:01:36
529
码农
Docker
...快速构建、发布和执行应用软件。其中一个重要的特性是能够与宿主机共享网络,使得Docker容器能够与宿主机网卡进行通讯,达成网络连接。 $ docker run -it --net=host imageName 可以使用上面的指令来执行一个Docker容器,其中--net=host选项许可容器共享宿主机的网络命名空间,即使用宿主机的网络栈。 例如,如果你有一个Python应用软件在容器中执行,并且需要连接宿主机上的MySQL数据库,则可以使用以下代码来连接: import mysql.connector cnx = mysql.connector.connect(user='root', password='password', host='127.0.0.1', database='test', auth_plugin='mysql_native_password') cursor = cnx.cursor() 在这个示例中,Python应用软件在容器中执行,但是与宿主机共享网络,因此可以连接到宿主机上的MySQL数据库。 总而言之,在Docker中与宿主机共享网络是非常容易的。只需使用--net=host选项执行容器即可达成。这个特性在很多场景下非常有用,如连接数据库、调用API等。
2023-03-28 21:41:55
589
逻辑鬼才
转载文章
...程序代码(如PHP、Python或Java)与HTML或其他格式的文档分离,通过变量替换、控制结构等机制动态生成最终输出给用户的网页内容。在本文中,Smarty就是一种模板引擎的具体实现。 capture内置函数 , capture是Smarty模板引擎提供的一个内置函数,允许开发者捕获并存储模板中特定范围内的输出内容到一个变量中,而非直接输出到页面上。capture函数有三种用法。
2023-12-03 17:52:39
79
转载
Python
在深入理解Python中正则表达式的强大功能后,我们可以进一步探索这一领域在实际开发和数据分析中的最新应用。例如,在2023年初,GitHub上一个热门的开源项目“RegExPlus”就引入了对Python正则表达式的新颖扩展,它提供了一套易于理解和使用的API,使得开发者能够更加高效地处理复杂文本模式匹配任务。 此外,近期一篇发表于《计算机科学与技术》期刊上的学术论文探讨了如何优化Python正则表达式引擎以提升大数据环境下的搜索性能。研究团队通过深度剖析re模块的底层算法,并结合现代硬件特性进行了创新性改进,实现了显著的速度提升,这对于处理大规模文本数据具有重大意义。 同时,Python社区也在不断更新和完善其正则表达式教程资源。Python官方文档针对re模块进行了详尽更新,新增了许多实用案例和高级技巧说明,帮助开发者紧跟时代步伐,解决实际工作中遇到的各种字符串匹配难题。 对于有兴趣深入了解正则表达式理论基础的读者,推荐阅读由Jeffrey Friedl所著的《Mastering Regular Expressions》一书,该书以其丰富的示例和深入浅出的解析,被广大开发者誉为正则表达式领域的经典之作。通过研读此类资料,您不仅能深化对Python中正则表达式的掌握,还能将其应用于更多跨语言、跨平台的场景,从而提升自身在文本挖掘、数据分析等领域的专业技能。
2023-08-02 16:27:28
304
代码侠
MySQL
...MS;,在各种互联网应用、大型企业系统中得到了广泛应用。如今,鉴于云技术、海量数据等技术的积极推进,MySQL也持续发展,提供了各种访问MySQL的方法。 //采用Python访问MySQL import mysql.connector mydb = mysql.connector.connect( host="localhost", user="yourusername", password="yourpassword", database="yourdatabase" ) mycursor = mydb.cursor() mycursor.execute("SELECT FROM customers") myresult = mycursor.fetchall() for x in myresult: print(x) //采用Java访问MySQL import java.sql.; public class ReadMySQL { public static void main(String[] args) { try { Connection myConn = DriverManager.getConnection("jdbc:mysql://localhost:3306/yourdatabase", "yourusername", "yourpassword"); Statement myStmt = myConn.createStatement(); ResultSet myRs = myStmt.executeQuery("SELECT FROM customers"); while (myRs.next()) { System.out.println(myRs.getString("name") + "," + myRs.getString("email")); } } catch (Exception exc) { exc.printStackTrace(); } } } 以上是采用Python和Java访问MySQL的示例,访问MySQL还可以采用其他编程语言,如PHP、Ruby等。同时,为了提高MySQL的访问效率,也可以引入缓存技术,如Memcached、Redis等。
2024-02-28 15:31:14
130
逻辑鬼才
JSON
...代Web开发中的广泛应用,不仅限于JavaScript环境,各大主流编程语言如Python、Java、C等也纷纷提供了对JSON的强大支持。例如,Python中使用json模块的dumps()和loads()函数实现与JavaScript类似的序列化和反序列化操作。而在Java环境中,Jackson库是处理JSON数据的首选工具,它提供了高效且功能丰富的API进行对象与JSON之间的转换。 近年来,随着API经济的发展,JSON作为HTTP请求和响应的标准格式,其重要性进一步提升。例如,GraphQL这一现代API查询语言就以JSON格式传递查询和结果,提供了一种更灵活、高效的客户端与服务器间数据交互方式。 此外,在数据存储领域,MongoDB等NoSQL数据库更是直接以内置支持JSON的BSON格式存储文档型数据,极大地简化了数据模型设计和查询过程。 值得注意的是,随着JSON Web Tokens(JWT)的普及,JSON还在安全认证方面发挥着关键作用。JWT利用JSON格式封装用户信息,通过加密算法保证数据传输的安全性,被广泛应用于无状态、跨域的身份验证场景。 总之,从数据交换、API设计到数据存储和安全认证,JSON已成为现代软件开发不可或缺的一部分。深入理解和掌握JSON及其相关工具和技术,对于提升开发者工作效率和应用性能具有重要意义。未来,随着技术演进及新应用场景的拓展,JSON的作用和影响力预计将进一步增强。
2023-12-14 20:46:43
491
程序媛
ElasticSearch
在深入了解ElasticSearch的Painless scripting功能之后,我们发现脚本语言在现代大数据处理与分析领域的重要性日益凸显。近期,Elastic公司发布了Elasticsearch 7.15版本,对Painless scripting进行了更多优化和增强,引入了新的API、函数以及性能改进,使得用户能够更加高效、安全地执行复杂的数据操作。 实际应用中,某知名电商企业就在其日志分析系统中充分利用了Painless scripting的强大功能,实现了对海量用户行为数据的实时筛选、转换和聚合分析,有效提升了用户体验并优化了业务决策流程。这一成功案例不仅验证了ElasticSearch在大规模数据分析场景下的实力,也展示了Painless scripting在解决实际问题中的巨大潜力。 此外,为了帮助开发者更好地掌握Painless scripting,社区内涌现出众多教程资源和技术博客,如“深入浅出Elasticsearch Painless scripting”系列文章,从基础语法到实战技巧,为读者提供了详尽的学习指南和实践路径。 总的来看,随着技术的发展与应用场景的拓展,ElasticSearch及其Painless scripting将继续在搜索优化、数据分析乃至AIops等领域发挥关键作用,值得广大技术人员持续关注和学习。
2023-02-04 22:33:34
479
风轻云淡-t
Python
在深入理解Python框架如Flask中表单提交功能的应用之后,我们可以进一步探索这一领域的新动态和实践案例。近期,Django 3.2版本的发布为Python Web开发带来了更强大的表单处理能力,其内置的Form类提供了全面的数据验证、清理和格式化机制,大大增强了数据安全性和应用稳健性。同时,随着API First理念的普及,RESTful架构设计下的表单提交也愈发重要,通过使用诸如Marshmallow这样的库,开发者可以在Python框架中实现更为灵活和规范的序列化与反序列化过程。 此外,针对Web应用的安全问题,OWASP(开放网络应用安全项目)定期发布的“十大Web应用安全风险”报告,其中就包括了“注入攻击”和“弱身份认证”等与表单提交密切相关的安全威胁。因此,在实际开发过程中,如何结合Python框架提供的功能对用户输入进行严格过滤和加密处理,以防止SQL注入、跨站脚本攻击等安全漏洞,成为了开发者必须关注和掌握的核心技能之一。 另外,对于数据持久化的优化, SQLAlchemy等ORM框架在处理表单提交后的数据存储上发挥了关键作用,它们不仅简化了数据库操作,还能通过声明式方式执行复杂的SQL查询,从而提高应用性能并降低出错率。 综上所述,Python框架中的表单提交功能及其相关技术在不断发展和完善,了解最新的框架特性、遵循最佳安全实践,并结合高效的ORM工具,将有助于我们构建出更加安全、稳定且用户体验良好的Web应用。
2023-10-31 17:23:22
282
码农
JSON
...等特点,因此,被广泛应用在程序化测试中。 JSON程序化测试的环节,主要是通过采用代码进行程序化测试,并对JSON格式的数据进行程序化处理。检测代码编写结束后,可以直接整合进持续构建工具中,在每次提交代码后自动执行。 下面是一个使用Python语言进行JSON程序化测试的例子: import requests import json def test_api(): headers = {'Content-Type': 'application/json'} data = {'name': 'test', 'age': '25'} response = requests.post('http://example.com/api/users', headers=headers, data=json.dumps(data)) assert response.status_code == 200 assert response.json().get('success') is True 在这个例子中,我们使用了Python中的requests库,来仿照发送一个POST方式请求。我们设置了请求的headers和data,借助于json.dumps()函数将data转换为JSON格式。在请求结束后,我们通过assert断言判断请求的返回状态码和JSON数据是否符合预期。如果测试案例执行成功,则代表接口调用正常。 总的来说,JSON程序化测试可以帮助我们实现快速、可靠和缩短测试时间等诸多优点。同时需要注意JSON格式的数据,需要符合规范,否则在数据处理环节中可能会出现意想不到的错误。
2023-12-07 16:32:59
499
软件工程师
Tesseract
...分出来。我们可以使用Python的PIL库来实现这个功能: python from PIL import ImageEnhance def preprocess_image(image_path): img = Image.open(image_path) enhancer = ImageEnhance.Contrast(img) contrast_img = enhancer.enhance(0.5) 设置增强系数 return contrast_img 此外,我们还可以尝试使用一些专门针对高对比度图像的OCR工具,如Google Vision API或者Amazon Textract。 三、低对比度图像的问题 3.1 问题描述 相反,当图像的对比度过低时,所有的颜色可能都接近于灰色,使得文本与背景之间的边界变得模糊。这种情况下,Tesseract也可能无法准确识别文本。 3.2 解决方案 同样,我们可以通过提高对比度来改善这种情况。但是需要注意的是,如果对比度过高,可能会导致之前提到的问题。因此,我们需要找到一个合适的平衡点。 另外,我们也可以考虑使用更复杂的算法来提高识别效果。比如说,咱们可以尝试用深度学习的招数,比如那个卷积神经网络(CNN),来给图片做“切块”处理,就像把一副画分割成不同的小部分,然后对这些切割出来的前景部分,我们再单独进行识别工作。 四、结论 总的来说,处理图像对比度过高或过低的问题主要依赖于图像预处理和识别算法的选择。在实际操作中,咱们得瞅准实际情况和具体需求,像挑衣服那样,灵活地找出最合身、最合适的策略来用。同时呢,眼瞅着深度学习这些新鲜技术日益精进,我们可真是满怀期待,盼望着能有更多神奇的解决方案蹦跶出来,让OCR的表现力再上一层楼。
2023-09-16 20:45:02
119
寂静森林-t
Mongo
...goDB数据库的实际应用中,字段类型不匹配的问题尤为常见,且可能引发数据处理错误及性能瓶颈。近期,随着NoSQL数据库的广泛应用以及数据来源的多元化,正确处理和转换数据类型显得更为关键。例如,在进行实时数据分析或大数据集成时,未经验证的数据类型可能会导致分析结果偏差,甚至触发程序异常。 在最新版本的MongoDB 5.0中,引入了更严格模式(Strict Mode)以帮助开发者更好地管理数据类型,确保插入文档的数据类型与集合schema定义一致。通过启用严格模式,MongoDB会在写入操作阶段就对字段类型进行校验,从而避免后续查询、分析过程中因类型不匹配带来的问题。 此外,对于从API、CSV文件或其他非结构化数据源导入数据至MongoDB的情况,推荐使用如Pandas库(Python)或JSON.parse()方法(JavaScript)等工具预先进行数据清洗和类型转换,确保数据格式合规。同时,结合Schema设计的最佳实践,如运用BSON数据类型和$convert aggregation operator,可以在很大程度上降低因字段类型不匹配引发的风险,提升数据操作效率和准确性。 因此,深入理解和掌握如何有效预防及解决MongoDB中的字段类型不匹配问题,是现代数据工程师与开发人员必备技能之一,有助于构建稳定可靠的数据平台,为业务决策提供精准支撑。
2023-12-16 08:42:04
184
幽谷听泉-t
Mongo
...DB更适应现代Web应用对灵活数据模型的需求,并且通常能提供更高的水平扩展能力和读写性能。 Bulk Write Operations , Bulk Write Operations是MongoDB提供的一个功能强大的API,允许用户在一个操作中执行多个写入操作,包括插入、更新和删除等。这个特性极大地提升了数据库批量操作的效率,同时提供了详细的错误报告和部分成功事务的支持,即使在处理大量数据时出现网络中断或其他问题,也能确保数据的一致性和完整性。 分片技术(Sharding) , 在MongoDB中,分片是一种水平扩展策略,用于将大型集合的数据分割成多个部分,这些部分分布在不同的服务器上,从而实现海量数据的存储与高效查询。通过分片,MongoDB能够将数据自动分散到集群中的多个分片节点,有效解决了单一节点存储容量和处理能力的瓶颈问题,进而支持TB甚至PB级别的数据规模,并保持良好的查询性能。
2023-09-16 14:14:15
146
心灵驿站-t
Tornado
...Tornado是一个Python Web框架和异步网络库,由FriendFeed开发,并于2009年开源。然而,在实际操作的时候,我们可能会遇到这么个情况:咱们的Tornado服务器突然不听话了,死活启动不了。 二、什么是Tornado? Tornado是一种用于构建可伸缩Web应用程序和非阻塞网络服务的Python库。它超级灵活,能够轻松应对海量的同时连接请求,而且在I/O操作这方面可是精心优化过的,所以特别适合那些需要实时交互的应用和服务场景。然而,跟其他软件一样,Tornado这家伙有时候也会闹点小脾气,比如它可能会出现个常见的问题——“Tornado服务器启动不起来啦”。 三、为什么会出现“Tornado服务器无法启动”的问题? 当我们在运行Tornado服务器时,如果出现“Tornado服务器无法启动”的错误,那么这通常意味着我们的服务器遇到了某种问题,无法正常启动并提供服务。这种情况可能有很多原因,以下是一些最常见的可能性: 1. 依赖包缺失 Tornado是一个依赖众多Python库的程序,如果我们没有正确安装或者缺少某些必要的依赖,那么就可能出现这个问题。 2. 路径配置错误 在运行Tornado服务器之前,我们需要进行一些路径配置,如果这些配置不正确,也可能导致服务器无法启动。 3. 系统资源不足 如果我们的系统资源(如内存、CPU等)不足以支持Tornado服务器的运行,那么服务器也可能无法启动。 四、如何解决“Tornado服务器无法启动”的问题? 当我们遇到“Tornado服务器无法启动”的问题时,我们应该首先尝试找出具体的原因,然后根据具体情况来解决问题。以下是一些可能的解决方案: 1. 检查依赖包 我们可以检查一下是否已经正确安装了所有的依赖包。如果没有,我们就需要安装它们。例如,我们可以通过pip来安装: python pip install tornado 2. 检查路径配置 我们需要确保我们的路径配置是正确的。例如,我们可以在代码中这样设置路径: python import os os.chdir("/path/to/your/project") 3. 检查系统资源 我们需要确保我们的系统资源足够支持Tornado服务器的运行。要是资源不够使了,咱们可能得考虑升级一下硬件设备,或者把咱们的代码整得更精简些,好让资源能省着点用。 五、总结 “Tornado服务器无法启动”是我们经常遇到的一个问题,但是只要我们找到了具体的原因,并采取相应的措施,就可以很容易地解决这个问题。另外呢,咱们也得学点日常的故障排除小窍门儿,这样一旦碰上问题,就能立马找到解冑方案,省得干着急。 六、参考资料 [1] Tornado官方文档: [2] Stack Overflow上的相关讨论: 注意:以上内容仅供参考,具体的操作方法需要根据实际情况进行调整。
2023-12-23 10:08:52
156
落叶归根-t
ElasticSearch
...模板帮助数据分析师在ElasticSearch中根据需要迅速定位并获取指定范围、类型或其他特定条件下的数据。 钻取(Drilldown)操作 , 在数据分析领域,钻取是指从概括性的高层面数据逐步深入到详细数据的过程。它允许用户从汇总数据开始,然后逐层向下探索更具体的数据细节。在Kibana中,通过设置和使用URL模板实现钻取操作,用户能够快速锁定并挖掘海量数据中的目标信息,提高分析效率。 ElasticSearch , Elasticsearch是一个基于Lucene构建的开源分布式全文搜索引擎,专为云计算环境设计,提供近实时搜索、分析以及存储数据的能力。在本文中,ElasticSearch是承载大数据分析的基础平台,与Kibana可视化工具结合使用,使得用户能够利用URL模板等高级功能高效地进行数据搜索和分析工作。
2023-08-09 23:59:55
494
雪域高原-t
Docker
...(简称K8s)的广泛应用,如何高效地收集、存储和分析大规模Docker容器集群产生的海量日志成为了热门话题。 例如,2023年春季,Elastic公司发布了新版Elasticsearch、Logstash和Kibana(ELK Stack),针对Kubernetes环境优化了日志管理功能,可以实时收集并可视化Docker容器日志,便于运维人员进行深度监控和故障排查。此外,业界也在积极研究和发展开源工具如Fluentd、Prometheus以及Grafana等,这些工具为Docker日志提供了强大的采集、过滤、分析能力,并能与各类云存储服务无缝对接,实现日志数据长期保存和合规性要求。 与此同时,容器可观测性领域也有了新的突破。OpenTelemetry项目提供了一套跨平台的标准和工具集,可统一收集包括容器日志在内的各项指标、跟踪和日志信息,大大提升了分布式系统中问题定位的效率和准确性。 在实际应用中,为了更好地满足微服务架构下容器日志的安全性和一致性需求,越来越多的企业开始采用服务网格技术如Istio来增强日志治理能力,通过统一的日志策略管理和审计,确保了容器环境下的日志安全性与合规性。 因此,在掌握Docker日志基本操作的基础上,关注日志领域的最新技术和解决方案,对于提升云原生环境下的运维效率与保障系统稳定性具有重要意义。不断学习和了解这些先进的日志处理手段,将有助于我们在日常工作中应对复杂场景,有效利用日志信息驱动系统的持续优化和改进。
2023-09-05 21:33:01
333
代码侠
Apache Pig
...进行数据分区 python -- 定义一个字段,用于数据分区 splitA = load 'input' as (value:chararray); -- 对于这个字段进行数据分区 splitA = group splitA by value; -- 保存结果 store splitA into 'output'; 2. 使用bucket()函数进行数据分桶 python -- 定义一个字段,用于数据分桶 bucketB = load 'input' as (value:chararray); -- 对于这个字段进行数据分桶 bucketB = bucket bucketB into bag{ $value } by toInt($value) div 10; -- 保存结果 store bucketB into 'output'; 五、总结 在处理大数据时,数据分区和分桶是必不可少的技术手段。它们可以帮助我们更快地访问和处理数据,从而提高性能和效率。在Apache Pig这个工具里头,我们可以直接用它自带的一些内置函数,轻轻松松就把这些功能给实现了,就像变魔术一样简单。我希望这篇文章能够帮助你更好地理解和利用Apache Pig的这些特性。如果你有任何问题,欢迎随时向我提问!
2023-06-07 10:29:46
431
雪域高原-t
Python
在Python数据可视化领域中,除了Matplotlib和plotly这两个广受欢迎的库之外,近年来还有其他一些绘图工具因其独特的优势崭露头角。例如Bokeh,它专注于大型交互式数据可视化,并且支持流式数据处理,特别适合大数据集下的实时可视化展示。另外,Altair库以声明式语法为基础,其简洁易读的API设计深受开发者喜爱,尤其适用于构建统计图表和数据探索性分析。 此外,对于热衷于地理信息可视化的用户来说,GeoPandas与Plotly的组合或单独使用GeoViews等库,可以高效地实现地理空间数据的可视化。而Seaborn作为基于matplotlib的数据可视化库,提供了高级接口和丰富美观的默认样式,特别适合用于绘制复杂的统计图形。 值得注意的是,随着Jupyter Notebook和JupyterLab等交互式开发环境的普及,诸如ipywidgets这样的库也开始受到关注,它们能够帮助我们在Notebook环境中创建丰富的、带有交互元素的数据可视化应用。 总之,在Python生态下,不断涌现的各种绘图工具正在满足不同场景下的可视化需求,让用户在选择时可以根据项目特点、数据类型以及个人偏好灵活选取最佳工具,从而实现更高质量的数据可视化呈现。
2023-07-14 11:34:15
119
落叶归根_t
ElasticSearch
ElasticSearch高效匹配邻近关键字? 说到搜索引擎,可能大家第一时间就会想到Google和百度等大厂的产品。其实吧,在这个大数据满天飞的时代,有一个小而精悍、威力无比的搜索引擎工具也悄悄火了起来,它就是大名鼎鼎的Elasticsearch。 那么,Elasticsearch是什么?它又有哪些特点呢?今天我们就来一起探讨一下Elasticsearch高效匹配邻近关键字的话题。 一、什么是Elasticsearch? Elasticsearch是一个基于Lucene构建的分布式搜索引擎工具,它具有实时处理海量数据、高性能的搜索能力、丰富的数据分析功能等特点。 二、为什么要匹配邻近关键字? 在实际的业务场景中,很多时候我们需要根据用户输入的关键字进行搜索。比如,在逛电商网站的时候,用户可能就会直接在搜索框里敲入“手机壳+苹果”这样的关键词去寻找他们想要的商品。这会儿,假如我们仅找出那些仅仅含有“手机壳”和“苹果”两个关键词的文档,显然这就不能满足用户真正的搜索需求啦。因此,我们就需要实现一种能够匹配邻近关键字的功能。 三、如何实现邻近匹配? 要实现邻近匹配,我们可以使用Elasticsearch中的match_phrase查询和span_first函数。首先,match_phrase查询可以用来指定要查询的完整字符串,如果文档中包含这个字符串,则匹配成功。其次,span_first函数可以让我们选择第一个匹配到的子串。 下面是一段使用Elasticsearch的示例代码: python GET /my_index/_search { "query": { "bool": { "should": [ { "match_phrase": { "title": { "query": "quick brown fox", "slop": 3, "max_expansions": 100 } } }, { "span_first": { "clauses": [ { "match": { "body": { "query": "brown fox", "slop": 3, "max_expansions": 100 } } } ], "end_offset": 30 } } ] } } } 在这个例子中,我们使用了一个布尔查询,其中包含了两个子查询:一个是match_phrase查询,另一个是span_first函数。match_phrase查询用于查找包含“quick brown fox”的文档,而span_first函数则用于查找包含“brown fox”的文档,并且确保其出现在“quick brown fox”之后。 四、如何优化邻近匹配性能? 除了使用Elasticsearch提供的工具外,我们还可以通过一些其他的手段来优化邻近匹配的性能。例如,我们可以增加索引缓存大小、减少搜索范围、合理设置匹配阈值等。 总的来说,Elasticsearch是一款非常强大的搜索引擎工具,它可以帮助我们快速地找到符合条件的数据。同时呢,我们还可以用上一些小窍门和方法,让邻近匹配这事儿变得更有效率、更精准,就像是给它装上了加速器和定位仪一样。希望本文的内容对你有所帮助!
2023-05-29 16:02:42
463
凌波微步_t
站内搜索
用于搜索本网站内部文章,支持栏目切换。
知识学习
实践的时候请根据实际情况谨慎操作。
随机学习一条linux命令:
ssh user@hostname
- 远程登录服务器。
推荐内容
推荐本栏目内的其它文章,看看还有哪些文章让你感兴趣。
2023-04-28
2023-08-09
2023-06-18
2023-04-14
2023-02-18
2023-04-17
2024-01-11
2023-10-03
2023-09-09
2023-06-13
2023-08-07
2023-03-11
历史内容
快速导航到对应月份的历史文章列表。
随便看看
拉到页底了吧,随便看看还有哪些文章你可能感兴趣。
时光飞逝
"流光容易把人抛,红了樱桃,绿了芭蕉。"