前端技术
HTML
CSS
Javascript
前端框架和UI库
VUE
ReactJS
AngularJS
JQuery
NodeJS
JSON
Element-UI
Bootstrap
Material UI
服务端和客户端
Java
Python
PHP
Golang
Scala
Kotlin
Groovy
Ruby
Lua
.net
c#
c++
后端WEB和工程框架
SpringBoot
SpringCloud
Struts2
MyBatis
Hibernate
Tornado
Beego
Go-Spring
Go Gin
Go Iris
Dubbo
HessianRPC
Maven
Gradle
数据库
MySQL
Oracle
Mongo
中间件与web容器
Redis
MemCache
Etcd
Cassandra
Kafka
RabbitMQ
RocketMQ
ActiveMQ
Nacos
Consul
Tomcat
Nginx
Netty
大数据技术
Hive
Impala
ClickHouse
DorisDB
Greenplum
PostgreSQL
HBase
Kylin
Hadoop
Apache Pig
ZooKeeper
SeaTunnel
Sqoop
Datax
Flink
Spark
Mahout
数据搜索与日志
ElasticSearch
Apache Lucene
Apache Solr
Kibana
Logstash
数据可视化与OLAP
Apache Atlas
Superset
Saiku
Tesseract
系统与容器
Linux
Shell
Docker
Kubernetes
[利用SQL命令判断数据库是否创建成功]的搜索结果
这里是文章列表。热门标签的颜色随机变换,标签颜色没有特殊含义。
点击某个标签可搜索标签相关的文章。
点击某个标签可搜索标签相关的文章。
转载文章
...媒资信息 需求分析 数据模型 Dao Service 测试 0x03 Logstash:扫描课程计划媒资 创建索引 创建模板文件 配置 mysql.conf 启动 logstash.bat Logstash多实例运行 0x04 搜素服务:查询课程媒资接口 需求分析 Api接口定义 Service Controller 测试 三、在线学习:接口开发 0x01 需求分析 0x02 搭建开发环境 0x03 Api接口 0x04 服务端开发 需求分析 搜索服务注册Eureka 搜索服务客户端 自定义错误代码 Service Controller 测试 0x05 前端开发 需求分析 api方法 配置代理 视频播放页面 简单的测试 完整的测试 1、上传文件 一些问题 ~~方案1:删除本地分块文件重新尝试上传~~ 方案2:检查前端提交的MD5值是否正确 2、为课程计划选择媒资信息 3、前端门户测试 四、待完善的一些功能 😁 认识作者 一、学习页面:查询课程计划 0x01 需求分析 到目前为止,我们已可以编辑课程计划信息并上传课程视频,下一步我们要实现在线学习页面动态读取章节对应的视频并进行播放。在线学习页面所需要的信息有两类: 课程计划信息 课程学习信息(视频地址、学习进度等) 如下图: 在线学习集成媒资管理的需求如下: 1、在线学习页面显示课程计划 2、点击课程计划播放该课程计划对应的视频 本章节实现学习页面动态显示课程计划,进入不同课程的学习页面右侧动态显示当前课程的课程计划。 0x02 Api接口 课程计划信息从哪里获取? 在课程发布完成后会自动发布到一个 course_pub 的表中,logstash 会自动将课程发布后的信息自动采集到 ES 索引库中,这些信息也包含课程计划信息。 所以考虑性能要求,课程发布后对课程的查询统一从 ES 索引库中查询。 前端通过请求 搜索服务 获取课程信息,需要单独在 搜索服务 中定义课程信息查询接口。 本接口接收课程id,查询课程所有信息返回给前端。 我们在搜素服务 API 下添加以下方法 @ApiOperation("根据id搜索课程发布信息")public Map<String,CoursePub> getdetail(String id); 返回的课程信息为 json 结构:key 为课程id,value 为课程内容。 0x03 服务端开发 在搜索服务中开发查询课程信息接口。 Controller 在搜素服务下添加以下方法 / 根据id搜索课程发布信息 @param id 课程id @return JSON数据/@Override@GetMapping("/getdetail/{id}")public Map<String, CoursePub> getdetail(@PathVariable("id")String id) {return esCourseService.getdetail(id);} Service / 根据id搜索课程发布信息 @param id 课程id @return JSON数据/public Map<String, CoursePub> getdetail(String id) {//设置索引SearchRequest searchRequest = new SearchRequest(es_index);//设置类型searchRequest.types(es_type);//创建搜索源对象SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();//设置查询条件,根据id进行查询searchSourceBuilder.query(QueryBuilders.termQuery("id",id));//这里不使用source的原字段过滤,查询所有字段// searchSourceBuilder.fetchSource(new String[]{"name", "grade", "charge","pic"}, newString[]{});//设置搜索源对象searchRequest.source(searchSourceBuilder);//执行搜索SearchResponse searchResponse = null;try {searchResponse = restHighLevelClient.search(searchRequest);} catch (IOException e) {e.printStackTrace();}//获取搜索结果SearchHits hits = searchResponse.getHits();SearchHit[] searchHits = hits.getHits(); //获取最优结果Map<String,CoursePub> map = new HashMap<>();for (SearchHit hit: searchHits) {//从搜索结果中取值并添加到coursePub对象Map<String, Object> sourceAsMap = hit.getSourceAsMap();String courseId = (String) sourceAsMap.get("id");String name = (String) sourceAsMap.get("name");String grade = (String) sourceAsMap.get("grade");String charge = (String) sourceAsMap.get("charge");String pic = (String) sourceAsMap.get("pic");String description = (String) sourceAsMap.get("description");String teachplan = (String) sourceAsMap.get("teachplan");CoursePub coursePub = new CoursePub();coursePub.setId(courseId);coursePub.setName(name);coursePub.setPic(pic);coursePub.setGrade(grade);coursePub.setTeachplan(teachplan);coursePub.setDescription(description);//设置map对象map.put(courseId,coursePub);}return map;} 测试 使用 swagger-ui 或 postman 测试查询课程信息接口。 0x04 前端开发 配置NGINX虚拟主机 学习中心的二级域名为 ucenter.xuecheng.com ,我们在 nginx 中配置 ucenter 虚拟主机。 学成网用户中心server {listen 80;server_name ucenter.xuecheng.com;个人中心location / {proxy_pass http://ucenter_server_pool;} } 前端ucenterupstream ucenter_server_pool{server 127.0.0.1:7081 weight=10;server 127.0.0.1:13000 weight=10;} 在学习中心要调用搜索的 API,使用 Nginx 解决代理,如下图: 在 ucenter 虚拟主机下配置搜索 Api 代理路径 后台搜索(公开api)upstream search_server_pool{server 127.0.0.1:40100 weight=10;} 学成网用户中心server {listen 80;server_name ucenter.xuecheng.com;个人中心location / {proxy_pass http://ucenter_server_pool;}后端搜索服务location /openapi/search/ {proxy_pass http://search_server_pool/search/;} } 前端 API 方法 在学习中心 xc-ui-pc-leanring 对课程信息的查询属于基础常用功能,所以我们将课程查询的 api 方法定义在base 模块下,如下图: 在system.js 中定义课程查询方法: import http from './public'export const course_view = id => {return http.requestGet('/openapi/search/course/getdetail/'+id);} 前端 API 方法调用 在 learning_video.vue 页面中调用课程信息查询接口得到课程计划,将课程计划json 串转成对象。 xc-ui-pc-leanring/src/module/course/page/learning_video.vue 1、定义视图 课程计划 <!--课程计划部分代码--><div class="navCont"><div class="course-weeklist"><div class="nav nav-stacked" v-for="(teachplan_first, index) in teachplanList"><div class="tit nav-justified text-center"><i class="pull-left glyphicon glyphicon-th-list"></i>{ {teachplan_first.pname} }<i class="pull-right"></i></div><li v-if="teachplan_first.children!=null" v-for="(teachplan_second, index) in teachplan_first.children"><i class="glyphicon glyphicon-check"></i><a :href="url" @click="study(teachplan_second.id)">{ {teachplan_second.pname} }</a></li><!-- <div class="tit nav-justified text-center"><i class="pull-left glyphicon glyphicon-th-list"></i>第一章<i class="pull-right"></i></div><li ><i class="glyphicon glyphicon-check"></i><a :href="url" >第一节</a></li>--><!--<li><i class="glyphicon glyphicon-unchecked"></i>为什么分为A、B、C部分</li>--></div></div></div> 课程名称 <div class="top text-center">{ {coursename} }</div> 定义数据对象 data() {return {url:'',//当前urlcourseId:'',//课程idchapter:'',//章节Idcoursename:'',//课程名称coursepic:'',//课程图片teachplanList:[],//课程计划playerOptions: {//播放参数autoplay: false,controls: true,sources: [{type: "application/x-mpegURL",src: ''}]},} } 在 created 钩子方法中获取课程信息 created(){//当前请求的urlthis.url = window.location//课程idthis.courseId = this.$route.params.courseId//章节idthis.chapter = this.$route.params.chapter//查询课程信息systemApi.course_view(this.courseId).then((view_course)=>{if(!view_course || !view_course[this.courseId]){this.$message.error("获取课程信息失败,请重新进入此页面!")return ;} let courseInfo = view_course[this.courseId]console.log(courseInfo)this.coursename = courseInfo.nameif(courseInfo.teachplan){let teachplan = JSON.parse(courseInfo.teachplan);this.teachplanList = teachplan.children;} })}, 测试 在浏览器请求:http://ucenter.xuecheng.com//learning/4028e581617f945f01617f9dabc40000/0 4028e581617f945f01617f9dabc40000:第一个参数为课程 id,测试时从 ES索引库找一个课程 id 0:第二个参数为课程计划 id,此参数用于点击课程计划播放视频。 如果出现跨域问题,但是确定已经配置了跨域,请尝试结束所以 nginx.exe 的进程 和 清空浏览器缓存。 如果还没有解决?重启电脑试试。 二、学习页面:获取视频播放地址 0x01 需求分析 用户进入在线学习页面,点击课程计划将播放该课程计划对应的教学视频。 业务流程如下: 业务流程说明: 1、用户进入在线学习页面,页面请求搜索服务获取课程信息(包括课程计划信息)并且在页面展示。 2、在线学习请求学习服务获取视频播放地址。 3、学习服务校验当前用户是否有权限学习,如果没有权限学习则提示用户。 4、学习服务校验通过,请求搜索服务获取课程媒资信息。 5、搜索服务请求ElasticSearch获取课程媒资信息。 为什么要请求 ElasticSearch 查询课程媒资信息? 出于性能的考虑,公开查询课程信息从搜索服务查询,分摊 mysql 数据库的访问压力。 什么时候将课程媒资信息存储到 ElasticSearch 中? 课程媒资信息是在课程发布的时候存入 ElasticSearch,因为课程发布后课程信息将基本不再修改。 0x02 课程发布:储存媒资信息 需求分析 课程媒资信息是在课程发布的时候存入 ElasticSearch 索引库,因为课程发布后课程信息将基本不再修改,具体的业务流程如下。 1、课程发布,向课程媒资信息表写入数据。 1)根据课程 id 删除 teachplanMediaPub 中的数据 2)根据课程 id 查询 teachplanMedia 数据 3)将查询到的 teachplanMedia 数据插入到 teachplanMediaPub 中 2、Logstash 定时扫描课程媒资信息表,并将课程媒资信息写入索引库。 数据模型 在 xc_course 数据库创建课程计划媒资发布表: CREATE TABLE teachplan_media_pub (teachplan_id varchar(32) NOT NULL COMMENT '课程计划id',media_id varchar(32) NOT NULL COMMENT '媒资文件id',media_fileoriginalname varchar(128) NOT NULL COMMENT '媒资文件的原始名称',media_url varchar(256) NOT NULL COMMENT '媒资文件访问地址',courseid varchar(32) NOT NULL COMMENT '课程Id',timestamp timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT'logstash使用',PRIMARY KEY (teachplan_id)) ENGINE=InnoDB DEFAULT CHARSET=utf8 数据模型类如下: package com.xuecheng.framework.domain.course;import lombok.Data;import lombok.ToString;import org.hibernate.annotations.GenericGenerator;import javax.persistence.;import java.io.Serializable;import java.util.Date;@Data@ToString@Entity@Table(name="teachplan_media_pub")@GenericGenerator(name = "jpa-assigned", strategy = "assigned")public class TeachplanMediaPub implements Serializable {private static final long serialVersionUID = -916357110051689485L;@Id@GeneratedValue(generator = "jpa-assigned")@Column(name="teachplan_id")private String teachplanId;@Column(name="media_id")private String mediaId;@Column(name="media_fileoriginalname")private String mediaFileOriginalName;@Column(name="media_url")private String mediaUrl;@Column(name="courseid")private String courseId;@Column(name="timestamp")private Date timestamp;//时间戳} Dao 创建 TeachplanMediaPub 表的 Dao,向 TeachplanMediaPub 存储信息采用先删除该课程的媒资信息,再添加该课程的媒资信息,所以这里定义根据课程 id 删除课程计划媒资方法: public interface TeachplanMediaPubRepository extends JpaRepository<TeachplanMediaPub, String> {//根据课程id删除课程计划媒资信息long deleteByCourseId(String courseId);} 从TeachplanMedia查询课程计划媒资信息 //从TeachplanMedia查询课程计划媒资信息public interface TeachplanMediaRepository extends JpaRepository<TeachplanMedia, String> {List<TeachplanMedia> findByCourseId(String courseId);} Service 编写保存课程计划媒资信息方法,并在课程发布时调用此方法。 1、保存课程计划媒资信息方法 本方法采用先删除该课程的媒资信息,再添加该课程的媒资信息,在 CourseService 下定义该方法 //保存课程计划媒资信息private void saveTeachplanMediaPub(String courseId){//查询课程媒资信息List<TeachplanMedia> byCourseId = teachplanMediaRepository.findByCourseId(courseId);if(byCourseId == null) return; //没有查询到媒资数据则直接结束该方法//将课程计划媒资信息储存到待索引表//删除原有的索引信息teachplanMediaPubRepository.deleteByCourseId(courseId);//一个课程可能会有多个媒资信息,遍历并使用list进行储存List<TeachplanMediaPub> teachplanMediaPubList = new ArrayList<>();for (TeachplanMedia teachplanMedia: byCourseId) {TeachplanMediaPub teachplanMediaPub = new TeachplanMediaPub();BeanUtils.copyProperties(teachplanMedia, teachplanMediaPub);teachplanMediaPubList.add(teachplanMediaPub);}//保存所有信息teachplanMediaPubRepository.saveAll(teachplanMediaPubList);} 2、课程发布时调用此方法 修改课程发布的 coursePublish 方法: ....//保存课程计划媒资信息到待索引表saveTeachplanMediaPub(courseId);//页面urlString pageUrl = cmsPostPageResult.getPageUrl();return new CoursePublishResult(CommonCode.SUCCESS,pageUrl);..... 测试 测试课程发布后是否成功将课程媒资信息存储到 teachplan_media_pub 中,测试流程如下: 1、指定一个课程 2、为课程计划添加课程媒资 3、执行课程发布 4、观察课程计划媒资信息是否存储至 teachplan_media_pub 中 注意:由于此测试仅用于测试发布课程计划媒资信息的功能,可暂时将 cms页面发布的功能暂时屏蔽,提高测试效率。 测试结果如下 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Vrzs5589-1595567273126)(https://qnoss.codeyee.com/20200704_15/image7)] 0x03 Logstash:扫描课程计划媒资 Logstash 定时扫描课程媒资信息表,并将课程媒资信息写入索引库。 创建索引 1、创建 xc_course_media 索引 2、并向此索引创建如下映射 POST: http://localhost:9200/xc_course_media/doc/_mapping {"properties" : {"courseid" : {"type" : "keyword"},"teachplan_id" : {"type" : "keyword"},"media_id" : {"type" : "keyword"},"media_url" : {"index" : false,"type" : "text"},"media_fileoriginalname" : {"index" : false,"type" : "text"} }} 索引创建成功 创建模板文件 在 logstach 的 config 目录文件 xc_course_media_template.json 文件路径为 %ES_ROOT_DIR%/logstash6.8.8/config/xc_course_media_template.json %ES_ROOT_DIR% 为 ElasticSearch 和 logstash 的安装目录 内容如下: {"mappings" : {"doc" : {"properties" : {"courseid" : {"type" : "keyword"},"teachplan_id" : {"type" : "keyword"},"media_id" : {"type" : "keyword"},"media_url" : {"index" : false,"type" : "text"},"media_fileoriginalname" : {"index" : false,"type" : "text"} }},"template" : "xc_course_media"} } 配置 mysql.conf 在logstash的 config 目录下配置 mysql_course_media.conf 文件供 logstash 使用,logstash 会根据 mysql_course_media.conf 文件的配置的地址从 MySQL 中读取数据向 ES 中写入索引。 参考https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html 配置输入数据源和输出数据源。 input {stdin {} jdbc {jdbc_connection_string => "jdbc:mysql://localhost:3306/xc_course?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC" 数据库信息jdbc_user => "root"jdbc_password => "123123" MYSQL 驱动地址,修改为maven仓库对应的位置jdbc_driver_library => "D:/soft/apache-maven-3.5.4/repository/mysql/mysql-connector-java/5.1.40/mysql-connector-java-5.1.40.jar" the name of the driver class for mysqljdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_paging_enabled => "true"jdbc_page_size => "50000"要执行的sql文件statement_filepath => "/conf/course.sql"statement => "select from teachplan_media_pub where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)"定时配置schedule => " "record_last_run => truelast_run_metadata_path => "D:/soft/elasticsearch/logstash-6.8.8/config/xc_course_media_metadata"} } output {elasticsearch {ES的ip地址和端口hosts => "localhost:9200"hosts => ["localhost:9200","localhost:9202","localhost:9203"]ES索引库名称index => "xc_course_media"document_id => "%{teachplan_id}"document_type => "doc"template => "D:/soft/elasticsearch/logstash-6.8.8/config/xc_course_media_template.json"template_name =>"xc_course_media"template_overwrite =>"true"} stdout {日志输出codec => json_lines} } 启动 logstash.bat 启动 logstash.bat 采集 teachplan_media_pub 中的数据,向 ES 写入索引。 logstash.bat -f ../config/mysql_course_media.conf 课程发布成功后,Logstash 会自动参加 teachplan_media_pub 表中新增的数据,效果如下 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ILPBxfXi-1595567273134)(https://qnoss.codeyee.com/20200704_15/image10)] Logstash多实例运行 由于之前我们还启动了一个 Logstash 对课程的发布信息进行采集,所以如果想两个 logstash 实例同时运行,因为每个实例都有一个.lock文件,所以不能使用同一个目录来存放数据,所以我们需要使用 --path.data= 为每个实例指定单独的数据目录,具体的代码如下: 该配置是在windows下进行的 课程发布实例 logstash_start_course_pub.bat @title logstash in course_publogstash.bat -f ..\config\mysql.conf --path.data=../data/course_pub 课程计划媒体发布实例 logstash_start_teachplan_media.bat @title logstash i n teachplan_media_publogstash.bat -f ../config/mysql_course_media.conf --path.data=../data/teachplan_media/ 同时运行效果如下 0x04 搜素服务:查询课程媒资接口 需求分析 搜索服务 提供查询课程媒资接口,此接口供学习服务调用。 Api接口定义 @ApiOperation("根据课程计划查询媒资信息")public TeachplanMediaPub getmedia(String teachplanId); Service 1、配置课程计划媒资索引库等信息 在 application.yml 中配置 xuecheng:elasticsearch:hostlist: ${eshostlist:127.0.0.1:9200} 多个结点中间用逗号分隔course:index: xc_coursetype: docsource_field: id,name,grade,mt,st,charge,valid,pic,qq,price,price_old,status,studymodel,teachmode,expires,pub_time,start_time,end_timemedia:index: xc_course_mediatype: docsource_field: courseid,media_id,media_url,teachplan_id,media_fileoriginalname 2、service 方法开发 在 课程搜索服务 中定义课程媒资查询接口,为了适应后续需求,service 参数定义为数组,可一次查询多个课程计划的媒资信息。 / 根据一个或者多个课程计划id查询媒资信息 @param teachplanIds 课程id @return QueryResponseResult/public QueryResponseResult<TeachplanMediaPub> getmedia(String [] teachplanIds){//设置索引SearchRequest searchRequest = new SearchRequest(media_index);//设置类型searchRequest.types(media_type);//创建搜索源对象SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();//源字段过滤String[] media_index_arr = media_field.split(",");searchSourceBuilder.fetchSource(media_index_arr, new String[]{});//查询条件,根据课程计划id查询(可以传入多个课程计划id)searchSourceBuilder.query(QueryBuilders.termsQuery("teachplan_id", teachplanIds));searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = null;try {searchResponse = restHighLevelClient.search(searchRequest);} catch (IOException e) {e.printStackTrace();}//获取结果SearchHits hits = searchResponse.getHits();long totalHits = hits.getTotalHits();SearchHit[] searchHits = hits.getHits();//数据列表List<TeachplanMediaPub> teachplanMediaPubList = new ArrayList<>();for(SearchHit hit:searchHits){TeachplanMediaPub teachplanMediaPub =new TeachplanMediaPub();Map<String, Object> sourceAsMap = hit.getSourceAsMap();//取出课程计划媒资信息String courseid = (String) sourceAsMap.get("courseid");String media_id = (String) sourceAsMap.get("media_id");String media_url = (String) sourceAsMap.get("media_url");String teachplan_id = (String) sourceAsMap.get("teachplan_id");String media_fileoriginalname = (String) sourceAsMap.get("media_fileoriginalname");teachplanMediaPub.setCourseId(courseid);teachplanMediaPub.setMediaUrl(media_url);teachplanMediaPub.setMediaFileOriginalName(media_fileoriginalname);teachplanMediaPub.setMediaId(media_id);teachplanMediaPub.setTeachplanId(teachplan_id);//将对象加入到列表中teachplanMediaPubList.add(teachplanMediaPub);}//构建返回课程媒资信息对象QueryResult<TeachplanMediaPub> queryResult = new QueryResult<>();queryResult.setList(teachplanMediaPubList);queryResult.setTotal(totalHits);return new QueryResponseResult<TeachplanMediaPub>(CommonCode.SUCCESS,queryResult);} Controller / 根据课程计划id搜索发布后的媒资信息 @param teachplanId @return/@GetMapping(value="/getmedia/{teachplanId}")@Overridepublic TeachplanMediaPub getmedia(@PathVariable("teachplanId") String teachplanId) {//为了service的拓展性,所以我们service接收的是数组作为参数,以便后续开发查询多个ID的接口String[] teachplanIds = new String[]{teachplanId};//通过service查询ES获取课程媒资信息QueryResponseResult<TeachplanMediaPub> mediaPubQueryResponseResult = esCourseService.getmedia(teachplanIds);QueryResult<TeachplanMediaPub> queryResult = mediaPubQueryResponseResult.getQueryResult();if(queryResult!=null&& queryResult.getList()!=null&& queryResult.getList().size()>0){//返回课程计划对应课程媒资return queryResult.getList().get(0);} return new TeachplanMediaPub();} 测试 使用 swagger-ui 和 postman 测试课程媒资查询接口。 三、在线学习:接口开发 0x01 需求分析 根据下边的业务流程,本章节完成前端学习页面请求学习服务获取课程视频地址,并自动播放视频。 0x02 搭建开发环境 1、创建数据库 创建 xc_learning 数据库,学习数据库将记录学生的选课信息、学习信息。 导入:资料/xc_learning.sql 2、创建学习服务工程 参考课程管理服务工程结构,创建学习服务工程: 导入:资料/xc-service-learning.zip 项目工程结构如下 0x03 Api接口 此 api 接口是课程学习页面请求学习服务获取课程学习地址。 定义返回值类型: package com.xuecheng.framework.domain.learning.response;import com.xuecheng.framework.model.response.ResponseResult;import com.xuecheng.framework.model.response.ResultCode;import lombok.Data;import lombok.NoArgsConstructor;import lombok.ToString;@Data@ToString@NoArgsConstructorpublic class GetMediaResult extends ResponseResult {public GetMediaResult(ResultCode resultCode, String fileUrl) {super(resultCode);this.fileUrl = fileUrl;}//媒资文件播放地址private String fileUrl;} 定义接口,学习服务根据传入课程 ID、章节 Id(课程计划 ID)来取学习地址。 @Api(value = "录播课程学习管理",description = "录播课程学习管理")public interface CourseLearningControllerApi {@ApiOperation("获取课程学习地址")public GetMediaResult getMediaPlayUrl(String courseId,String teachplanId);} 0x04 服务端开发 需求分析 学习服务根据传入课程ID、章节Id(课程计划ID)请求搜索服务获取学习地址。 搜索服务注册Eureka 学习服务要调用搜索服务查询课程媒资信息,所以需要将搜索服务注册到 eureka 中。 1、查看服务名称是否为 xc-service-search 注意修改application.xml中的服务名称:spring:application:name: xc‐service‐search 2、配置搜索服务的配置文件 application.yml,加入 Eureka 配置 如下: eureka:client:registerWithEureka: true 服务注册开关fetchRegistry: true 服务发现开关serviceUrl: Eureka客户端与Eureka服务端进行交互的地址,多个中间用逗号分隔defaultZone: ${EUREKA_SERVER:http://localhost:50101/eureka/,http://localhost:50102/eureka/}instance:prefer-ip-address: true 将自己的ip地址注册到Eureka服务中ip-address: ${IP_ADDRESS:127.0.0.1}instance-id: ${spring.application.name}:${server.port} 指定实例idribbon:MaxAutoRetries: 2 最大重试次数,当Eureka中可以找到服务,但是服务连不上时将会重试,如果eureka中找不到服务则直接走断路器MaxAutoRetriesNextServer: 3 切换实例的重试次数OkToRetryOnAllOperations: false 对所有操作请求都进行重试,如果是get则可以,如果是post,put等操作没有实现幂等的情况下是很危险的,所以设置为falseConnectTimeout: 5000 请求连接的超时时间ReadTimeout: 6000 请求处理的超时时间 3、添加 eureka 依赖 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring‐cloud‐starter‐netflix‐eureka‐client</artifactId></dependency> 4、修改启动类,在class上添加如下注解: @EnableDiscoveryClient 搜索服务客户端 在 学习服务 创建搜索服务的客户端接口,此接口会生成代理对象,调用搜索服务: package com.xuecheng.learning.client;import com.xuecheng.framework.domain.course.TeachplanMediaPub;import org.springframework.cloud.openfeign.FeignClient;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;@FeignClient(value = "xc‐service‐search")public interface CourseSearchClient {@GetMapping(value="/getmedia/{teachplanId}")public TeachplanMediaPub getmedia(@PathVariable("teachplanId") String teachplanId);} 自定义错误代码 我们在 com.xuecheng.framework.domain.learning.response 包下自定义一个错误消息模型 package com.xuecheng.framework.domain.learning.response;import com.xuecheng.framework.model.response.ResultCode;import lombok.ToString;@ToStringpublic enum LearningCode implements ResultCode {LEARNING_GET_MEDIA_ERROR(false,23001,"学习中心获取媒资信息错误!");//操作代码boolean success;//操作代码int code;//提示信息String message;private LearningCode(boolean success, int code, String message){this.success = success;this.code = code;this.message = message;}@Overridepublic boolean success() {return success;}@Overridepublic int code() {return code;}@Overridepublic String message() {return message;} } 该消息模型基于 ResultCode 来实现,代码如下 package com.xuecheng.framework.model.response;/ Created by mrt on 2018/3/5. 10000-- 通用错误代码 22000-- 媒资错误代码 23000-- 用户中心错误代码 24000-- cms错误代码 25000-- 文件系统/public interface ResultCode {//操作是否成功,true为成功,false操作失败boolean success();//操作代码int code();//提示信息String message(); 从 ResultCode 中我们可以看出,我们约定了用户中心的错误代码使用 23000,所以我们定义的一些错误信息的代码就从 23000 开始计数。 Service 在学习服务中定义 service 方法,此方法远程请求课程管理服务、媒资管理服务获取课程学习地址。 package com.xuecheng.learning.service.impl;import com.netflix.discovery.converters.Auto;import com.xuecheng.framework.domain.course.TeachplanMediaPub;import com.xuecheng.framework.domain.learning.response.GetMediaResult;import com.xuecheng.framework.exception.ExceptionCast;import com.xuecheng.framework.model.response.CommonCode;import com.xuecheng.learning.client.CourseSearchClient;import com.xuecheng.learning.service.LearningService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Servicepublic class LearningServiceImpl implements LearningService {@AutowiredCourseSearchClient courseSearchClient;/ 远程调用搜索服务获取已发布媒体信息中的url @param courseId 课程id @param teachplanId 媒体信息id @return/@Overridepublic GetMediaResult getMediaPlayUrl(String courseId, String teachplanId) {//校验学生权限,是否已付费等//远程调用搜索服务进行查询媒体信息TeachplanMediaPub mediaPub = courseSearchClient.getmedia(teachplanId);if(mediaPub == null) ExceptionCast.cast(CommonCode.FAIL);return new GetMediaResult(CommonCode.SUCCESS, mediaPub.getMediaUrl());} } Controller 调用 service 根据课程计划 id 查询视频播放地址: @RestController@RequestMapping("/learning/course")public class CourseLearningController implements CourseLearningControllerApi {@AutowiredLearningService learningService;@Override@GetMapping("/getmedia/{courseId}/{teachplanId}")public GetMediaResult getMediaPlayUrl(@PathVariable String courseId, @PathVariable String teachplanId) {//获取课程学习地址return learningService.getMedia(courseId, teachplanId);} } 测试 使用 swagger-ui 或postman 测试学习服务查询课程视频地址接口。 0x05 前端开发 需求分析 需要在学习中心前端页面需要完成如下功能: 1、进入课程学习页面需要带上 课程 Id参数及课程计划Id的参数,其中 课程 Id 参数必带,课程计划 Id 可以为空。 2、进入页面根据 课程 Id 取出该课程的课程计划显示在右侧。 3、进入页面后判断如果请求参数中有课程计划 Id 则播放该章节的视频。 4、进入页面后判断如果 课程计划id 为0则需要取出本课程第一个 课程计划的Id,并播放第一个课程计划的视频。 进入到模块 xc-ui-pc-leanring/src/module/course api方法 let sysConfig = require('@/../config/sysConfig')let apiUrl = sysConfig.xcApiUrlPre;/获取播放地址/export const get_media = (courseId,chapter) => {return http.requestGet(apiUrl+'/api/learning/course/getmedia/'+courseId+'/'+chapter);} 配置代理 在 Nginx 中的 ucenter.xuecheng.com 虚拟主机中配置 /api/learning/ 的路径转发,此url 请转发到学习服务。 学习服务upstream learning_server_pool{server 127.0.0.1:40600 weight=10;}学成网用户中心server {listen 80;server_name ucenter.xuecheng.com;个人中心location / {proxy_pass http://ucenter_server_pool;}后端搜索服务location /openapi/search/ {proxy_pass http://search_server_pool/search/; }学习服务location ^~ /api/learning/ {proxy_pass http://learning_server_pool/learning/;} } 视频播放页面 1、如果传入的课程计划id为0则取出第一个课程计划id 在 created 钩子方法中完成 created(){//当前请求的urlthis.url = window.location//课程idthis.courseId = this.$route.params.courseId//章节idthis.chapter = this.$route.params.chapter//查询课程信息systemApi.course_view(this.courseId).then((view_course)=>{if(!view_course || !view_course[this.courseId]){this.$message.error("获取课程信息失败,请重新进入此页面!")return ;}let courseInfo = view_course[this.courseId]console.log(courseInfo)this.coursename = courseInfo.nameif(courseInfo.teachplan){console.log("准备开始播放视频")let teachplan = JSON.parse(courseInfo.teachplan);this.teachplanList = teachplan.children;//开始学习if(this.chapter == "0" || !this.chapter){//取出第一个教学计划this.chapter = this.getFirstTeachplan();console.log("第一个教学计划id为 ",this.chapter);this.study(this.chapter);}else{this.study(this.chapter);} }})}, 取出第一个章节 id,用户未输入课程计划 id 或者输入为 0 时,播放第一个。 //取出第一个章节getFirstTeachplan(){for(var i=0;i<this.teachplanList.length;i++){let firstTeachplan = this.teachplanList[i];//如果当前children存在,则取出第一个返回if(firstTeachplan.children && firstTeachplan.children.length>0){let secondTeachplan = firstTeachplan.children[0];return secondTeachplan.id;} }return ;}, 开始学习: //开始学习study(chapter){// 获取播放地址courseApi.get_media(this.courseId,chapter).then((res)=>{if(res.success){let fileUrl = sysConfig.videoUrl + res.fileUrl//播放视频this.playvideo(fileUrl)}else if(res.message){this.$message.error(res.message)}else{this.$message.error("播放视频失败,请刷新页面重试")} }).catch(res=>{this.$message.error("播放视频失败,请刷新页面重试")});}, 2、点击右侧课程章节切换播放 在原有代码基础上添加 click 事件,点击调用开始学习方法(study)。 <li v‐if="teachplan_first.children!=null" v‐for="(teachplan_second, index) inteachplan_first.children"><i class="glyphicon glyphicon‐check"></i><a :href="url" @click="study(teachplan_second.id)">{ {teachplan_second.pname} }</a></li> 3、地址栏路由url变更 这里需要注意一个问题,在用户点击课程章节切换播放时,地址栏的 url 也应该同步改变为当前所选择的课程计划 id 4、在线学习按钮 将 learnstatus 默认更改为 1,这样就能显示出马上学习的按钮,方便我们后续的集成测试。 文件路径为 xc-ui-pc-static-portal/include/course_detail_dynamic.html 部分代码块如下 <script>var body= new Vue({ //创建一个Vue的实例el: "body", //挂载点是id="app"的地方data: {editLoading: false,title:'测试',courseId:'',charge:'',//203001免费,203002收费learnstatus: 1 ,//课程状态,1:马上学习,2:立即报名、3:立即购买course:{},companyId:'template',company_stat:[],course_stat:{"s601001":"","s601002":"","s601003":""} }, 简单的测试 访问在线学习页面:http://ucenter.xuecheng.com//learning/课程id/课程计划id 通过 url 传入两个参数:课程id 和 课程计划id 如果没有课程计划则传入0 测试项目如下: 1、传入正确的课程id、课程计划id,自动播放本章节的视频 2、传入正确的课程id、课程计划id传入0,自动播放第一个视频 3、传入错误的课程id 或 课程计划id,提示错误信息。 4、通过右侧章节目录切换章节及播放视频。 访问: http://ucenter.xuecheng.com//learning/4028e58161bcf7f40161bcf8b77c0000/4028e58161bd18ea0161bd1f73190008 传入正确的课程id、课程计划id,自动播放本章节的视频 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ef0xxym7-1595567273153)(https://qnoss.codeyee.com/20200704_15/image17)] 传入正确的课程id、课程计划id传入0,自动播放第一个视频 访问 http://ucenter.xuecheng.com//learning/4028e58161bcf7f40161bcf8b77c0000/0 识别出第一个课程计划的 id 需要注意的是这里的 chapter 参数是我自己在 study 函数里加上去的,可以忽略。 传入错误的课程id或课程计划id,提示错误信息。 通过右侧章节目录切换章节及播放视频。 点击章节即可播放,但是点击制定章节后 url 没有发生改变,这个问题暂时还没有解决,关注笔记后面的内容。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TOGdxwb4-1595567273158)(https://qnoss.codeyee.com/20200704_15/image20)] 完整的测试 准备工作 启动 RabbitMQ,启动 Logstash、ElasticSearch 建议把所有后端服务都开起来 启动 前端静态门户、启动 nginx 、启动课程管理前端 我们整理一下测试的流程 上传两个媒资视频文件,用于测试 进入到课程管理,为课程计划选择媒资信息 发布课程,等待 logstash 将数据采集到 ElasticSearch 的索引库中 进入学成网主页,点击课程,进入到搜索门户页面 搜索课程,进入到课程详情页面 点击开始学习,进入到课程学习页面,选择课程计划中的一个章节进行学习。 1、上传文件 首先我们使用之前开发的媒资管理模块,上传两个视频文件用于测试。 第一个文件上传成功 一些问题 在上传第二个文件时,发生了错误,我们来检查一下问题出在了哪里 在媒体服务的控制台中可以看到,在 mergeChunks 方法在校验文件 md5 时候抛出了异常 我们在 MD5 校验这里打个断点,重新上传文件,分析一下问题所在。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OpEMZGI8-1595567273166)(https://qnoss.codeyee.com/20200704_15/image23)] 单步调试后发现,合并文件后的MD5值与用户上传的源文件值不相等 方案1:删除本地分块文件重新尝试上传 考虑到可能是在用户上传完 视频的分块文件时发生了一些问题,导致合并文件后与源文件的大小不等,导致MD5也不相同,这里我们把这个视频上传到本地的文件全部删除,在媒资上传页面重新上传文件。 对比所有分块文件的字节大小和本地源文件的大小,完全是相等的 删除所有文件后重新上传,md5值还是不等,考虑从调试一下文件合并的代码。 方案2:检查前端提交的MD5值是否正确 在查阅是否有其他的MD5值获取方案时,发现了一个使用 windows 本地命令获取文件MD5值的方法 certutil -hashfile .\19-在线学习接口-集成测试.avi md5 惊奇的发现,TM的原来是前端那边转换的MD5值不正确,后端这边是没有问题的。 从前面的图可以看出,本地和后端转换的都是以一个 f6f0 开头的MD5值 那么问题就出现在前端了,还需要花一些时间去分析一下,这里暂时就先告一段落,因为上传了几个文件测试中只有这一个文件出现了问题。 2、为课程计划选择媒资信息 进入到一个课程的管理页面 http://localhost:12000//course/manage/baseinfo/4028e58161bcf7f40161bcf8b77c0000 将刚才我们上传的媒资文件的信息和课程计划绑定 选择效果如下 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-epKaqzCD-1595567273178)(https://qnoss.codeyee.com/20200704_15/image29)] 2、发布课程,等待 logstash 从 course_pub 以及 teachplan_media_pub 表中采集数据到 ElasticSearch 当中 发布成功后,我们可以从 teachplan_media_pub 表中看到刚才我们发布的媒资信息 再观察 Logstash 的控制台,发现两个 Logstash 的实例都对更新的课程发布信息进行了采集 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hTUve2ik-1595567273183)(https://qnoss.codeyee.com/20200704_15/image32)] 3、前端门户测试 打开我们的门户主站 http://www.xuecheng.com/ [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4wZe9R84-1595567273185)(https://qnoss.codeyee.com/20200704_15/image33)] 点击导航栏的课程,进入到我们的搜索门户页面 如果无法进入到搜索门户,请检查你的 xc-ui-pc-portal 前端工程是否已经启动 进入到搜索门户后,可以看到一些初始化时搜索的课程数据,默认是搜索第一页的数据,每页2个课程。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BJ1AKoJb-1595567273187)(https://qnoss.codeyee.com/20200704_15/image34)] 我们可以测试搜索一下前面我们选择媒资信息时所用的课程 点击课程,进入到课程详情页面,然后再点击开始学习。 点击马上学习后,会进入到该课程的在线学习页面,默认自动播放我们第一个课程计划中的视频。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tcuLWnf2-1595567273193)(https://qnoss.codeyee.com/20200704_15/image37)] 我们可以在右侧的目录中选择第二个课程计划,会自动播放所选的课程计划所对应的媒资视频播放地址,该 播放地址正是我们刚才通过 Logstash 自动采集到 ElasticSearch 的索引信息,效果图如下 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Cvi9Dr0Y-1595567273195)(https://qnoss.codeyee.com/20200704_15/image38)] 四、待完善的一些功能 课程发布前,校验课程计划里面是否包含二级课程计划 课程发布前,校验课程计划信息里面是否全部包含媒资信息 删除媒资信息,并且同步删除ES中的索引 在获取该课程的播放地址时校验用户的合法、 在线学习页面,点击右侧目录中的课程计划同时改变url中的课程计划地址 视频文件 19-在线学习接口-集成测试.avi 前端上传时提交的MD5值不正确 😁 认识作者 作者:👦 LCyee ,全干型代码🐕 自建博客:https://www.codeyee.com 记录学习以及项目开发过程中的笔记与心得,记录认知迭代的过程,分享想法与观点。 CSDN 博客:https://blog.csdn.net/codeyee 记录和分享一些开发过程中遇到的问题以及解决的思路。 欢迎加入微服务练习生的队伍,一起交流项目学习过程中的一些问题、分享学习心得等,不定期组织一起刷题、刷项目,共同见证成长。 本篇文章为转载内容。原文链接:https://blog.csdn.net/codeyee/article/details/107558901。 该文由互联网用户投稿提供,文中观点代表作者本人意见,并不代表本站的立场。 作为信息平台,本站仅提供文章转载服务,并不拥有其所有权,也不对文章内容的真实性、准确性和合法性承担责任。 如发现本文存在侵权、违法、违规或事实不符的情况,请及时联系我们,我们将第一时间进行核实并删除相应内容。
2023-12-16 12:41:01
73
转载
转载文章
...le高级消息队列AQ创建消息负荷payload 创建队列表 创建队列并启动 队列的停止和删除 入队消息 出队消息 二Java使用JMS监听并处理Oracle AQ队列创建连接参数类 创建消息转换类 主类进行消息处理 三监控表记录变化通知Java创建表 创建存储过程 创建触发器 环境说明 本实验环境基于Oracle 12C和JDK1.8,其中Oracle 12C支持多租户特性,相较于之前的Oracle版本,使用‘C用户名‘表示用户,例如如果数据库用户叫kevin,则登陆时使用Ckevin进行登陆。 一、Oracle高级消息队列AQ Oracle AQ是Oracle中的消息队列,是Oracle中的一种高级应用,每个版本都在不断的加强,使用DBMS_AQ系统包进行相应的操作,是Oracle的默认组件,只要安装了Oracle数据库就可以使用。使用AQ可以在多个Oracle数据库、Oracle与Java、C等系统中进行数据传输。 下面分步骤说明如何创建Oracle AQ 1. 创建消息负荷payload Oracle AQ中传递的消息被称为有效负荷(payloads),格式可以是用户自定义对象或XMLType或ANYDATA。本例中我们创建一个简单的对象类型用于传递消息。 create type demo_queue_payload_type as object (message varchar2(4000)); 2. 创建队列表 队列表用于存储消息,在入队时自动存入表中,出队时自动删除。使用DBMS_AQADM包进行数据表的创建,只需要写表名,同时设置相应的属性。对于队列需要设置multiple_consumers为false,如果使用发布/订阅模式需要设置为true。 begin dbms_aqadm.create_queue_table( queue_table => 'demo_queue_table', queue_payload_type => 'demo_queue_payload_type', multiple_consumers => false ); end; 执行完后可以查看oracle表中自动生成了demo_queue_table表,可以查看影响子段(含义比较清晰)。 3. 创建队列并启动 创建队列并启动队列: begin dbms_aqadm.create_queue ( queue_name => 'demo_queue', queue_table => 'demo_queue_table' ); dbms_aqadm.start_queue( queue_name => 'demo_queue' ); end; 至此,我们已经创建了队列有效负荷,队列表和队列。可以查看以下系统创建了哪些相关的对象: SELECT object_name, object_type FROM user_objects WHERE object_name != 'DEMO_QUEUE_PAYLOAD_TYPE'; OBJECT_NAME OBJECT_TYPE ------------------------------ --------------- DEMO_QUEUE_TABLE TABLE SYS_C009392 INDEX SYS_LOB0000060502C00030$$ LOB AQ$_DEMO_QUEUE_TABLE_T INDEX AQ$_DEMO_QUEUE_TABLE_I INDEX AQ$_DEMO_QUEUE_TABLE_E QUEUE AQ$DEMO_QUEUE_TABLE VIEW DEMO_QUEUE QUEUE 我们看到一个队列带出了一系列自动生成对象,有些是被后面直接用到的。不过有趣的是,创建了第二个队列。这就是所谓的异常队列(exception queue)。如果AQ无法从我们的队列接收消息,将记录在该异常队列中。 消息多次处理出错等情况会自动转移到异常的队列,对于异常队列如何处理目前笔者还没有找到相应的写法,因为我使用的场景并不要求消息必须一对一的被处理,只要起到通知的作用即可。所以如果消息转移到异常队列,可以执行清空队列表中的数据 delete from demo_queue_table; 4. 队列的停止和删除 如果需要删除或重建可以使用下面的方法进行操作: BEGIN DBMS_AQADM.STOP_QUEUE( queue_name => 'demo_queue' ); DBMS_AQADM.DROP_QUEUE( queue_name => 'demo_queue' ); DBMS_AQADM.DROP_QUEUE_TABLE( queue_table => 'demo_queue_table' ); END; 5. 入队消息 入列操作是一个基本的事务操作(就像往队列表Insert),因此我们需要提交。 declare r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T; r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; v_message_handle RAW(16); o_payload demo_queue_payload_type; begin o_payload := demo_queue_payload_type('what is you name ?'); dbms_aq.enqueue( queue_name => 'demo_queue', enqueue_options => r_enqueue_options, message_properties => r_message_properties, payload => o_payload, msgid => v_message_handle ); commit; end; 通过SQL语句查看消息是否正常入队: select from aq$demo_queue_table; select user_data from aq$demo_queue_table; 6. 出队消息 使用Oracle进行出队操作,我没有实验成功(不确定是否和DBMS_OUTPUT的执行权限有关),代码如下,读者可以进行调试: declare r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T; r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; v_message_handle RAW(16); o_payload demo_queue_payload_type; begin DBMS_AQ.DEQUEUE( queue_name => 'demo_queue', dequeue_options => r_dequeue_options, message_properties => r_message_properties, payload => o_payload, msgid => v_message_handle ); DBMS_OUTPUT.PUT_LINE( ' Browse message is [' || o_payload.message || ']' ); end; 二、Java使用JMS监听并处理Oracle AQ队列 Java使用JMS进行相应的处理,需要使用Oracle提供的jar,在Oracle安装目录可以找到:在linux中可以使用find命令进行查找,例如 find pwd -name 'jmscommon.jar' 需要的jar为: app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/jmscommon.jar app/oracle/product/12.1.0/dbhome_1/jdbc/lib/ojdbc7.jar app/oracle/product/12.1.0/dbhome_1/jlib/orai18n.jar app/oracle/product/12.1.0/dbhome_1/jlib/jta.jar app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/aqapi_g.jar 1. 创建连接参数类 实际使用时可以把参数信息配置在properties文件中,使用Spring进行注入。 package org.kevin.jms; / @author 李文锴 连接参数信息 / public class JmsConfig { public String username = "ckevin"; public String password = "a111111111"; public String jdbcUrl = "jdbc:oracle:thin:@127.0.0.1:1521:orcl"; public String queueName = "demo_queue"; } 2. 创建消息转换类 因为消息载荷是Oracle数据类型,需要提供一个转换工厂类将Oracle类型转换为Java类型。 package org.kevin.jms; import java.sql.SQLException; import oracle.jdbc.driver.OracleConnection; import oracle.jdbc.internal.OracleTypes; import oracle.jpub.runtime.MutableStruct; import oracle.sql.CustomDatum; import oracle.sql.CustomDatumFactory; import oracle.sql.Datum; import oracle.sql.STRUCT; / @author 李文锴 数据类型转换类 / @SuppressWarnings("deprecation") public class QUEUE_MESSAGE_TYPE implements CustomDatum, CustomDatumFactory { public static final String _SQL_NAME = "QUEUE_MESSAGE_TYPE"; public static final int _SQL_TYPECODE = OracleTypes.STRUCT; MutableStruct _struct; // 12表示字符串 static int[] _sqlType = { 12 }; static CustomDatumFactory[] _factory = new CustomDatumFactory[1]; static final QUEUE_MESSAGE_TYPE _MessageFactory = new QUEUE_MESSAGE_TYPE(); public static CustomDatumFactory getFactory() { return _MessageFactory; } public QUEUE_MESSAGE_TYPE() { _struct = new MutableStruct(new Object[1], _sqlType, _factory); } public Datum toDatum(OracleConnection c) throws SQLException { return _struct.toDatum(c, _SQL_NAME); } public CustomDatum create(Datum d, int sqlType) throws SQLException { if (d == null) return null; QUEUE_MESSAGE_TYPE o = new QUEUE_MESSAGE_TYPE(); o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory); return o; } public String getContent() throws SQLException { return (String) _struct.getAttribute(0); } } 3. 主类进行消息处理 package org.kevin.jms; import java.util.Properties; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.Session; import oracle.jms.AQjmsAdtMessage; import oracle.jms.AQjmsDestination; import oracle.jms.AQjmsFactory; import oracle.jms.AQjmsSession; / @author 李文锴 消息处理类 / public class Main { public static void main(String[] args) throws Exception { JmsConfig config = new JmsConfig(); QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(config.jdbcUrl, new Properties()); QueueConnection conn = queueConnectionFactory.createQueueConnection(config.username, config.password); AQjmsSession session = (AQjmsSession) conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); Queue queue = (AQjmsDestination) session.getQueue(config.username, config.queueName); MessageConsumer consumer = session.createConsumer(queue, null, QUEUE_MESSAGE_TYPE.getFactory(), null, false); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { System.out.println("ok"); AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message; try { QUEUE_MESSAGE_TYPE payload = (QUEUE_MESSAGE_TYPE) adtMessage.getAdtPayload(); System.out.println(payload.getContent()); } catch (Exception e) { e.printStackTrace(); } } }); Thread.sleep(1000000); } } 使用Oracle程序块进行入队操作,在没有启动Java时看到队列表中存在数据。启动Java后,控制台正确的输出的消息;通过Oracle程序块再次写入消息,发现控制台正确处理消息。Java的JMS监听不是立刻进行处理,可能存在几秒中的时间差,时间不等。 三、监控表记录变化通知Java 下面的例子创建一个数据表,然后在表中添加触发器,当数据变化后触发器调用存储过程给Oracle AQ发送消息,然后使用Java JMS对消息进行处理。 1. 创建表 创建student表,包含username和age两个子段,其中username时varchar2类型,age时number类型。 2. 创建存储过程 创建send_aq_msg存储过程,因为存储过程中调用dbms数据包,系统包在存储过程中执行需要进行授权(使用sys用户进行授权): grant execute on dbms_aq to ckevin; 注意存储过程中包含commit语句。 create or replace PROCEDURE send_aq_msg (info IN VARCHAR2) as r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T; r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; v_message_handle RAW(16); o_payload demo_queue_payload_type; begin o_payload := demo_queue_payload_type(info); dbms_aq.enqueue( queue_name => 'demo_queue', enqueue_options => r_enqueue_options, message_properties => r_message_properties, payload => o_payload, msgid => v_message_handle ); commit; end send_aq_msg; 3. 创建触发器 在student表中创建触发器,当数据写入或更新时,如果age=18,则进行入队操作。需要调用存储过程发送消息,但触发器中不能包含事物提交语句,因此需要使用pragma autonomous_transaction;声明自由事物: CREATE OR REPLACE TRIGGER STUDENT_TR AFTER INSERT OR UPDATE OF AGE ON STUDENT FOR EACH ROW DECLARE pragma autonomous_transaction; BEGIN if :new.age = 18 then send_aq_msg(:new.username); end if; END; 创建完触发器后向执行插入或更新操作: insert into student (username,age) values ('jack.lee.3k', 18); update student set age=18 where username='jack003'; Java JMS可以正确的处理消息。 本篇文章为转载内容。原文链接:https://blog.csdn.net/weixin_42309178/article/details/115241521。 该文由互联网用户投稿提供,文中观点代表作者本人意见,并不代表本站的立场。 作为信息平台,本站仅提供文章转载服务,并不拥有其所有权,也不对文章内容的真实性、准确性和合法性承担责任。 如发现本文存在侵权、违法、违规或事实不符的情况,请及时联系我们,我们将第一时间进行核实并删除相应内容。
2023-12-17 14:22:22
138
转载
PostgreSQL
SQL优化工具使用不当,导致SQL执行效率低下:PostgreSQL实战解析 在数据库管理领域,PostgreSQL凭借其强大的功能和稳定性赢得了众多开发者和企业的青睐。不过,在实际操作的时候,我们偶尔会碰到这种情况:即使已经启用了SQL优化工具,查询速度还是没法让人满意,感觉有点儿不尽人意。本文要带你踏上一段趣味横生的旅程,我们会通过一系列鲜活的例子,手把手教你如何巧妙地运用SQL优化工具,从而在PostgreSQL这个大家伙里头,成功躲开那些拖慢数据库效率的低效SQL问题。 1. SQL优化工具的作用与问题引入 SQL优化工具通常可以帮助我们分析SQL语句的执行计划、索引使用情况以及潜在的资源消耗等,以便于我们对SQL进行优化改进。在实际操作中,如果咱们对这些工具的认识和运用不够熟练精通的话,那可能会出现“优化”不成,反而帮了倒忙的情况,让SQL的执行效率不升反降。 例如,假设我们在一个包含数百万条记录的orders表中查找特定用户的订单: sql -- 不恰当的SQL示例 SELECT FROM orders WHERE user_id = 'some_user'; 虽然可能有针对user_id的索引,但如果直接运行此查询并依赖优化工具盲目添加或调整索引,而不考虑查询的具体内容(如全表扫描),可能会导致SQL执行效率下降。 2. 理解PostgreSQL的查询规划器与执行计划 在PostgreSQL中,查询规划器负责生成最优的执行计划。要是我们没找准时机,灵活运用那些SQL优化神器,那么这个规划器小家伙,可能就会“迷路”,选了一条并非最优的执行路线。比如,对于上述例子,更好的方式是只选择需要的列而非全部: sql -- 更优的SQL示例 SELECT order_id, order_date FROM orders WHERE user_id = 'some_user'; 同时,结合EXPLAIN命令查看执行计划: sql EXPLAIN SELECT order_id, order_date FROM orders WHERE user_id = 'some_user'; 这样,我们可以清晰地了解查询是如何执行的,包括是否有效利用了索引。 3. 错误使用索引优化工具的案例分析 有时候,我们可能过于依赖SQL优化工具推荐的索引创建策略。例如,工具可能会建议为每个经常出现在WHERE子句中的字段创建索引。但这样做并不总是有益的,尤其是当涉及多列查询或者数据分布不均匀时。 sql -- 错误的索引创建示例 CREATE INDEX idx_orders_user ON orders (user_id); 如果user_id字段值分布非常均匀,新创建的索引可能不会带来显著性能提升。相反,综合考虑查询模式创建复合索引可能会更有效: sql -- 更合适的复合索引创建示例 CREATE INDEX idx_orders_user_order_date ON orders (user_id, order_date); 4. 结论与反思 面对SQL执行效率低下,我们需要深度理解SQL优化工具背后的原理,并结合具体业务场景进行细致分析。只有这样,才能避免因为工具使用不当而带来的负面影响。所以呢,与其稀里糊涂地全靠自动化工具,咱们还不如踏踏实实地去深入了解数据库内部是怎么运转的,既要明白表面现象,更要摸透背后的原理。这样一来,咱就能更接地气、更靠谱地制定出高效的SQL优化方案了。 总之,在PostgreSQL的世界里,SQL优化并非一蹴而就的事情,它要求我们具备严谨的逻辑思维、深入的技术洞察以及灵活应变的能力。让我们在实践中不断学习、思考和探索,共同提升PostgreSQL的SQL执行效率吧! 注:全表扫描在数据量巨大时往往意味着较低的查询效率,尤其当仅需少量数据时。
2023-09-28 21:06:07
263
冬日暖阳
Redis
...你有个超大的储物间(数据库或者其他服务),里面塞满了各种好玩意儿(数据),想拿啥就能拿啥!嘿,想象一下,现在有一群小毛贼(服务实例)都盯上了你的那些值钱的小宝贝,可不能让他们随便进来顺手牵羊啊!所以呢,你就得准备一把“神奇的钥匙”(锁),谁要是想进去拿东西,就必须先拿到这把钥匙才行。没有钥匙?不好意思,请自觉退散吧! 为什么要用分布式锁呢?因为在线上系统里,多台机器可能会同时操作同一个资源,比如抢购商品这种场景。如果没有锁机制的话,就可能出现重复下单、库存超卖等问题。分布式锁嘛,简单说就是抢车位的游戏规则——在同一时间里,只能有一个家伙抢到那个“资源位”,别的家伙就只能乖乖排队等着轮到自己啦! 不过说起来容易做起来难啊,尤其是在分布式环境下,网络延迟、机器宕机等问题会带来各种意想不到的情况。嘿,今天咱们就来唠唠,在Redis这个超级工具箱里,怎么才能整出个靠谱的分布式锁! --- 2. Redis为什么适合用来做分布式锁? 嘿,说到Redis,相信很多小伙伴都对它不陌生吧?Redis是一个基于内存的高性能键值存储系统,速度贼快,而且支持多种数据结构,比如字符串、哈希表、列表等等。最重要的是,它提供了原子性的操作指令,比如SETNX(Set if Not Exists),这让我们能够轻松地实现分布式锁! 让我给你们讲个小故事:有一次我尝试用数据库来做分布式锁,结果发现性能特别差劲,查询锁状态的SQL语句每次都要扫描整个表,效率低得让人抓狂。换了Redis之后,简直像开了挂一样,整个系统都丝滑得不行!Redis这玩意儿不光跑得快,还自带一堆黑科技,像什么过期时间、消息订阅啥的,这些功能简直就是搞分布式锁的神器啊! 所以,如果你也在纠结选什么工具来做分布式锁,强烈推荐试试Redis!接下来我会结合实际案例给你们展示具体的操作步骤。 --- 3. 实现分布式锁的基本思路 首先,我们要明确分布式锁需要满足哪些条件: 1. 互斥性 同一时刻只能有一个客户端持有锁。 2. 可靠性 即使某个客户端崩溃了,锁也必须自动释放,避免死锁。 3. 公平性 排队等待的客户端应该按照请求顺序获取锁。 4. 可重入性(可选) 允许同一个客户端多次获取同一个锁。 现在我们就来一步步实现这些功能。 示例代码 1:最基本的分布式锁实现 python import redis import time def acquire_lock(redis_client, lock_key, timeout=10): 尝试加锁,设置过期时间为timeout秒 result = redis_client.set(lock_key, "locked", nx=True, ex=timeout) return bool(result) def release_lock(redis_client, lock_key): 使用Lua脚本来保证解锁的安全性 script = """ if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end """ redis_client.eval(script, keys=[lock_key], args=["locked"]) 这段代码展示了最基础的分布式锁实现方式。我们用set命令设置了两个参数:一个是NX,意思是“只在key不存在的时候才创建”,这样就能避免重复创建;另一个是EX,给这个锁加了个过期时间,相当于设了个倒计时,万一客户端挂了或者出问题了,锁也能自动释放,就不会一直卡在那里变成死锁啦。最后,解锁的时候我们用了Lua脚本,这样可以保证操作的原子性。 --- 4. 如何解决锁的隔离性问题? 诶,说到这里,问题来了——如果两个不同的业务逻辑都需要用到同一个锁怎么办?比如订单系统和积分系统都想操作同一个用户的数据,这时候就需要考虑锁的隔离性了。换句话说,我们需要确保不同业务逻辑之间的锁不会互相干扰。 示例代码 2:基于命名空间的隔离策略 python def acquire_namespace_lock(redis_client, namespace, lock_name, timeout=10): 构造带命名空间的锁名称 lock_key = f"{namespace}:{lock_name}" result = redis_client.set(lock_key, "locked", nx=True, ex=timeout) return bool(result) def release_namespace_lock(redis_client, namespace, lock_name): lock_key = f"{namespace}:{lock_name}" script = """ if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end """ redis_client.eval(script, keys=[lock_key], args=["locked"]) 在这个版本中,我们在锁的名字前面加上了命名空间前缀,比如orders:place_order和points:update_score。这样一来,不同业务逻辑就可以使用独立的锁,避免相互影响。 --- 5. 进阶 如何处理锁竞争与性能优化? 当然啦,现实中的分布式锁并不会总是那么顺利,有时候会出现大量请求同时争抢同一个锁的情况。这时我们可能需要引入队列机制或者批量处理的方式来降低系统的压力。 示例代码 3:使用Redis的List模拟队列 python def enqueue_request(redis_client, queue_key, request_data): redis_client.rpush(queue_key, request_data) def dequeue_request(redis_client, queue_key): return redis_client.lpop(queue_key) def process_queue(redis_client, lock_key, queue_key): while True: 先尝试获取锁 if not acquire_lock(redis_client, lock_key): time.sleep(0.1) 等待一段时间再重试 continue 获取队列中的第一个请求并处理 request = dequeue_request(redis_client, queue_key) if request: handle_request(request) 释放锁 release_lock(redis_client, lock_key) 这段代码展示了如何利用Redis的List结构来管理请求队列。想象一下,好多用户一起抢同一个东西,场面肯定乱哄哄的对吧?这时候,咱们就让他们老老实实排成一队,然后派一个专门的小哥挨个儿去处理他们的请求。这样一来,大家就不会互相“打架”了,事情也能更顺利地办妥。 --- 6. 总结与反思 兄弟们,通过今天的讨论,我相信大家都对如何在Redis中实现分布式锁有了更深刻的理解了吧?虽然Redis本身已经足够强大,但我们仍然需要根据实际需求对其进行适当的扩展和优化。比如刚才提到的命名空间隔离、队列机制等,这些都是非常实用的小技巧。 不过呢,我也希望大家能记住一点——技术永远不是一成不变的。业务越做越大,技术也日新月异的,咱们得不停地充电,学点新鲜玩意儿,试试新招数才行啊!就像今天的分布式锁一样,也许明天就会有更高效、更优雅的解决方案出现。所以,保持好奇心,勇于探索未知领域,这才是程序员最大的乐趣所在! 好了,今天就聊到这里啦,祝大家在编程的路上越走越远!如果有任何疑问或者想法,欢迎随时找我交流哦~
2025-04-22 16:00:29
58
寂静森林
转载文章
...删除相应内容。 1.创建表:create table 表名(属性名 数据类型[约束条件],…); Paimary key 主键 auto_increment自增 foreign key 外键 references 另一表名(字段名).–>外键这个表连接着另外一个表的哪个键. 删除表: drop table 表名;–>表结构也删除了(也即是这个表没了) Truncate table 表名 --> 只删除表中数据,表结构不会删除. 2.In 与 not in 在或不在这个(1,3)里面,单个查询,只会查询(1或者3) 3.Between and 与 not … 和上面差不多,Between 1 and 3 但是这个是范围查询(1,3) 1-3 之间(包含1,3) 4.Like,模糊查询 “%” 代表任意字符,”_”代表单个字符. 5.Is Not null 与 is null 是否为空 6.And 与 or 一个是所有条件都要完成,or则是任何一个条件完成即可 7.Distinct 去重 8.Order by age asc 与 desc 排序,假如根据age排序,asc正序(升序默认),desc倒叙(降序) 9.Gruop by 分组查询,单独使用无意义,group_concat(字段),拼接,若是根据age group by 则会发现age一样的会出现在同一字段内 例如: : 最后要注意group by 后面的字段与所查字段的关系(一对一),当然还有having,having和where基本一样,只不过跟在group by后面. 10.Limit 分页查询 limit 0,5 .查询前5条数据,从0开始,5结束,但是5取不到,也即是取头不取尾. 11.聚合函数:count() 查询数据的总数据量 经常使用别名 例如:as total sum(字段)函数:求和…若字段为成绩,where条件或gruop by 为个人的id,那么查出的就是个人的成绩总分. AVG(字段),但是查的是平均分,min(字段)与max(字段) 查出最小或最大. 三者都类似sum(),当然max()与min()若是在最前面使用,就会当条件查询只会出来这一笔数据.例如: 12.Sql多表查询,内连接不只是inner join,平时写的from a表,b表 where 条件这也是内连接,意思就是两张表中数据都有才可以查询出来 13.而外连接分为左连接和右连接,意思是以左表或右表为主,假如两张表,左表数据多,右表数据少,且条件符合,则左连接的时候左表数据全部出来,右表没有的为null,反之也是一样. 14.Exist() 与 not exist() …()内的数据是否为空,若是为空则代表false,返回数据为空,若不为空,则代表true,正常查询. 15.Any 与 all 例如 age > any(age1,age2) 大于两者中的一个就可以,但是all的情况下则是全部大于.也就是相当于,any为大于最小的,all则是大于最大的就行了,当然若是小于号那就是另外一种情况了,另外分析. 16.Union,(也就是联合的意思,自带distinct,重复的去除)用法,例如两张表的id要全部查出来,则:select id from A union select id from B ,若Aid为1,2,3,Bid为1,2,4.则查出来的数据为1.2.3.4,若是union all,则不带distinct,用法一样,查出来以后为1.2.3.1.2.4. 17.给表取别名,表名 空格 别名 给字段取别名 字段名 as 别名. 18.Insert插入数据时若是使用insert into 表名 values();主键必须到写进去,当然与其他数据不相同即可,若是自增,可以写null.若是insert into 表名(字段)values(值),这时插入数据,字段不用写主键字段,写入其他数据字段名与值就可以完成数据的添加.(主键自己生成为前提,UUID,auto_increament都可以). 19.Insert into 插入多条数据时,其他与18一样,只不过由values()变成了values(),(),(); 20.索引是由数据库表中一列或多列组合而成,其作用提高对表数据的查询速度.像图书目录. 优缺点:优:提高了查询数据的效率.缺:创建和维护索引的时间增加了(内容改了,目录也要改). 21.索引分类:普通索引,唯一性索引UNIQUE(unique修饰,例如主键),全文索引FULLTEXT(创建在文本上,例如:char,varchar,varchar2等,mysql默认引擎不支持,),单列索引:单个字段建立索引,多列索引:多个字段创建一个索引,空间索引SPATIAL:不常用(mysql默认引擎不支持) 22.创建索引: index为关键字,或者key (1)可以index(字段名)–>普通索引 (2)Unique index(字段名)–>唯一索引 (3)Unique index 别名(字段名)–>取别名的唯一索引 (4)index 别名(字段名1,字段名2)–>取别名的多列索引 1.创建表的时候创建索引, 前三个为参数修饰,唯一性,全文,空间索引; 2.在已存在的表上创建索引,或者用ALTER TABLE 表名 ADD 索引,也就是用修改表的形式来创建索引 Create index 索引别名 on 表名(字段名) -->普通单列索引 Create index 索引别名 on 表名(字段名1,字段名2) -->多列索引 Create unique index 索引别名 on 表名(字段名) -->唯一单列索引 Alter table 表名 add +(1)|(2)|(3)|(4)即可. 23.删除索引: drop index 索引名 on 表名. 24.NOW(); mysql的函数,表示当前时间 25.视图:是一个虚拟的表,没有物理数据,是从其他表中导出的数据,当原表数据发生改变时,视图数据也会发生改变,反之也一样. (1)作用:操作简单化;增加数据安全性:不直接对表进行操作;提高表的逻辑性:原表修改字段对视图无影响. (2)创建视图:语法:create view 视图名 as 查询语句. 例如:create view vi as select id,name from user;–>这是把user中id,name字段的数据写入到vi视图中. 若是想自己定义字段名不用查出的字段名,可以如下面这样写. 例如:create view vi(vi_id,vi_name) as select id,name from user;–>这样的话id对应vi_id,name对应vi_name; 上面的都是单表的视图,多表的视图也是一样的,只不过后面的单表查询变成多表查询了. 建议创建视图后自己定义字段名,也即是定义别名. (3)查看视图: Describe(desc) 视图名–>查看视图基本信息 Show table status like ‘视图名’ --> 查看视图基本信息 Show create view 视图名 --> 视图详细信息,建表具体信息. 在view表中查看视图详细信息–>view 系统表 自带的. (4)修改视图:修改使徒的定义 Create or replace view 没有的话就创建,有的话就替换 例如:Create or replace view vi(id,name) as select语句. Alter view 只修改不能创建(也就是说视图必须存在的情况下才可修改) Alter view vi as select语句 (5)更新视图:视图是虚拟的,对视图进行的crud操作都会对原表的数据产生影响. 也就是说对视图的操作最后都会转换为对视图所连接那个表的操作. (6)删除视图:删除数据库中已存在的视图,视图为虚表,因此只会删除结构,不会删除数据. Drop view if exist 视图名. 26.触发器:由事件来触发某个操作,这些事件包括insert语句,update语句和delete语句.当数据库系统执行这些事件时,就会激活触发器执行相应的方法. 创建触发器:create trigger 触发器名 (before/after) 触发事件 on 表名 for each row sql语句. 这里的new是指代新插入的拿一条数据(更新的也算),若是old的话,指的是删除的那一条数据(更新之前的数据).(new和old属于过渡变量) 这条触发器的意思时:当t_book有插入数据时,就会根据新插入数据的id找到t_bookType的id,并试该条数据的bookNum加1. Begin与end写sql语句,中间可以写多条sql语句用分号;分隔开…也即是说语句要写完成,不能少分号. Delimiter | 设置分隔符,要不然好像只会执行begin与and之间的第一条sql语句. 查看触发器: 1.show triggers; 语句查看触发器信息.(查询所有的触发器) 2.在triggers表中查看触发器信息.(在数据库原始表triggers中可以查看) 删除触发器: Drop trigger 触发器名称 ; 27.函数: (1)日期函数: CURDATE()当前日期,CURTIME()当前时间,MONTH(d):返回日期d中的月份值,范围试1-12 (2)字符串函数:CHAR_LENGTH(s) 计算字段s值->字符串的长度.UPPER(s) 把该字段的值中所有英文都变成大写,LOWER(s) 和相面相反->把英文都变成小写. (3)数学函数:sum():求和,ABS(s) 求绝对值,SQRT(s):求平方根,mod(x,y),求余x/y (4)加密函数:PASSWORD(STR) 一般对密码加密 不可逆… MD5(STR) 普通加密 ,不可逆. ENCODE(str,pswd_str) 加密函数,结果是一个二进制文件,用blob类型的字段保存,pswd_str类似一个加密的钥匙,可以随便写. DECODE(被加密的值,pswd_str)–>对encode进行解密. 28.存储过程: (1)存储过程和函数:两者是在数据库中定义一些SQL语句的集合,然后直接调用这些存储过程和函数来执行已经定义好的SQL语句.存储过程和函数可以避免重复的写一些sql语句,而且存储过程是在mysql服务器中存储和执行的,减少客户端和服务器端的数据传输.(类似于java代码写的工具类.) (2)创建存储过程和函数: Create procedure 关键字 pro_book 存储过程名称, in 输入 bT 输入参数名称 int 输入参数类型 out 输出 count_num 输出参数名称 int 输入参数类型 Begin 过程开始 end过程结束 中间是sql语句, Delimiter 默认是分号,而他的作用就是若是遇见分号时就开始执行该过程(语句),但是一个存储过程可能有很多sql语句且以分号结束,若这样的情况下当第一条sql语句结束后就会开始执行该过程,产生的后果是创建过程时,执行到第一个分号就会开始创建,导致存储过程创建错误.(若是有多个参数,在多条sql中均有参数,第一条设置完执行了,而这时第二条的参数有可能还么有设置完成,导致sql执行失败.)因此,需要把默认执行过程的demiliter关键字的默认值改为其他的字符,例如上面的就是改为&&,(当然我认为上面就一条sql语句,改不改默认的demiliter的默认值都一样.) . 使用navicat的话不使用delimiter好像也是可以的. Reads sql data则是上面图片所提到的参数指定存储过程的特性.(这个是指读数据,当然还有写输入与读写数据专用的参数类型.)看下图 经常用contains sql (应该是可以读,) 这个是调用上面的存储过程,1为入参,@total相当于全局变量,为出参. 这是一个存储函数,create function 为关键字,fun_book为函数名称, 括号里面为传入的参数名(值)以及入参的类型.RETURNS 为返回的关键字,后面接返回的类型. BEGIN函数开始,END函数结束.中间是return 以及查询数据的sql语句, 这里是指把bookId 传进去,通过存储函数返回对应的书本名字, ---------存储函数的调用和调用系统函数一样 例如:select 存储函数名称(入参值) Select 为查询 func_book 为存储函数名 2为入参值. (3)变量的使用:declaer:声明变量的值 Delimiter && Create procedure user() Begin Declare a,b varchar2(20) ; — a,b有默认的值,为空 Insert into user values(a,b); End && Delimiter ; Set 可以用来赋值,例如: 可以从其他表中查询出对应的值插入到另一个表中.例如: 从t_user2中查询出username2与password2放入到变量a,b中,然后再插入到t_user表中.(当然这只是创建存储过程),创建完以后,需要用CALL 存储过程名(根据过程参数描写.)来调用存储过程.注意:这一种的写法只可以插入单笔数据,若是select查询出多笔数据,因为无循环故而会插入不进去语句,会导致倒致存储过程时出错.下面的游标也是如此. (4)游标的使用.查询语句可能查询出多条记录,在存储过程和函数中使用游标逐条读取查询结果集中的记录.游标的使用包括声明游标,打开游标,使用游标和关闭游标.游标必须声明到处理程序之前,并且声明在变量和条件之后. 声明:declare 游标名 curson for 查询sql语句. 打开:open 游标名 使用:fetch 游标名 into x, 关闭:close 游标名 ----- 游标只能保存单笔数据. 类似于这一个,意思就是先查询出来username2,与password2的值放入到cur_t_user2的游标中(声明,类似于赋值),然后开启->使用.使用的意思就是把游标中存储的值分别赋值到a,b中,然后执行sql语句插入到t_user表中.最后关闭游标. (5)流程控制的使用:mysql可以使用:IF 语句 CASE语句 LOOP语句 LEAVE语句 ITERATE 语句 REPEAT语句与WHILE语句. 这个过程的意思是,查询t_user表中是否存在id等于我们入参时所写的id,若有的情况下查出有几笔这样的数据并且把数值给到全局变量@num中,if判断是否这样的数据是否存在,若是存在执行THEN后面的语句,即使更新该id对应的username,若没有则插入一条新的数据,最后注意END IF. 相当于java中的switch case.例如: 这里想当然于,while(ture){ break; } 这里的意思是,参数一个int类型的参数,loop aaa循环,把参数当做主键id插入到t_user表中,每循环一次参入的参数值减一,直到参数值为0,跳出循环(if判断,leave实现.) 相当于java的continue. 比上面的多了一个当totalNum = 3时,结束本次循环,下面的语句不在执行,直接执行下一次循环,也即是说插入的数据没有主键为3的数据. 和上面的差不多,只不过当执行到UNTIL时满足条件时,就跳出循环.就如上面那一个意思就是当执行到totalNum = 1时,跳出循环,也就是说不会插入主键为0的那一笔数据 当while条件判断为true时,执行do后面的语句,否则就不再执行. (6)调用存储过程和函数 CALL 存储过程名字(参数值1,参数值2,…) 存储函数名称(参数值1,参数值2,…) (7)查看存储过程和函数. Show procedure status like ‘存储过程名’ --只能查看状态 Show create procedure ‘存储过程名’ – 查看定义(使用频率高). 存储函数查看也和上面的一样. 当然还可以从information_schema.Routines中(系统数据库表)查看存储过程与函数. (8)修改存储过程与函数: 修改存储过程comment属性的值 ALTER procedure 存储过程名 comment ‘新值’; (9)删除存储过程与函数: DROP PROCEDURE 存储过程名; DROP function 存储函数名; 29.数据备份与还原: (1)数据备份:数据备份可以保证数据库表的安全性,数据库管理员需要定期的进行数据库备份. 命令:使用mysqldump(下图),或者使用图形工具 Mysqldump在msql文件夹+bin+mysqldump.exe中,相当于一个小软件.执行的话是在dos命令窗操作的. 其实就是导出数据库数据,在navacat中可以如下图导出 (2)数据还原: 若是从navacat中就是把外部的.sql文件数据导入到数据库中去.如下图 本篇文章为转载内容。原文链接:https://blog.csdn.net/qq_42847571/article/details/102686087。 该文由互联网用户投稿提供,文中观点代表作者本人意见,并不代表本站的立场。 作为信息平台,本站仅提供文章转载服务,并不拥有其所有权,也不对文章内容的真实性、准确性和合法性承担责任。 如发现本文存在侵权、违法、违规或事实不符的情况,请及时联系我们,我们将第一时间进行核实并删除相应内容。
2023-04-26 19:09:16
83
转载
转载文章
...视化操作,二是后台的数据库管理。网管对前台的管理和维护工作包括保障网络链路通畅、处理MIS终端的突发事件以及对操作员的管理、培训等,这是网管们日常做得最多、最辛苦的功课;然而MIS系统架构中同等重要的针对数据库的管理、维护和优化工作,现实中似乎并没有得到网管朋友的足够重视,看起来这都是程序员的事,事实上,一个网管如果能在MIS设计期间就数据表的规范化、表索引优化、容量设计、事务处理等诸多方面与程序员进行卓有成效的沟通和协作,那么日常的前台管理工作将会变得大为轻松,因为在某种意义上,数据库管理系统就相当于操作系统,在系统中占有同样重要的位置。 这正是SQL SERVER等数据库管理系统和dBASEX、ACCESS等数据库文件系统的本质区别,所以,对数据库管理系统操作能力的强弱在某种程度上也折射出了网管的水平——个人认为,称得上优秀的Admin,至少应该是一个称职的DBA(数据库管理员)。 下面以SQL SERVER(下称 SQLS)为例,将数据库管理中难于理解的“索引原理”问题给各位朋友作一个深入浅出的介绍。其他的数据库管理系统如Oracle、Sybase等,朋友们可以融会贯通,举一反三。 一、数据表的基本结构 建立数据库的目的是管理大量数据,而建立索引的目的就是提高数据检索效率,改善数据库工作性能,提高数据访问速度。对于索引,我们要知其然,更要知其所以然,关键在于认识索引的工作原理,才能更好的管理索引。 为认识索引工作原理,首先有必要对数据表的基本结构作一次全面的复习。 SQLS当一个新表被创建之时,系统将在磁盘中分配一段以8K为单位的连续空间,当字段的值从内存写入磁盘时,就在这一既定空间随机保存,当一个8K用完的时候,SQLS指针会自动分配一个8K的空间。这里,每个8K空间被称为一个数据页(Page),又名页面或数据页面,并分配从0-7的页号,每个文件的第0页记录引导信息,叫文件头(File header);每8个数据页(64K)的组合形成扩展区(Extent),称为扩展。全部数据页的组合形成堆(Heap)。 SQLS规定行不能跨越数据页,所以,每行记录的最大数据量只能为8K。这就是char和varchar这两种字符串类型容量要限制在8K以内的原因,存储超过8K的数据应使用text类型,实际上,text类型的字段值不能直接录入和保存,它只是存储一个指针,指向由若干8K的文本数据页所组成的扩展区,真正的数据正是放在这些数据页中。 页面有空间页面和数据页面之分。 当一个扩展区的8个数据页中既包含了空间页面又包括了数据或索引页面时,称为混合扩展(Mixed Extent),每张表都以混合扩展开始;反之,称为一致扩展(Uniform Extent),专门保存数据及索引信息。 表被创建之时,SQLS在混合扩展中为其分配至少一个数据页面,随着数据量的增长,SQLS可即时在混合扩展中分配出7个页面,当数据超过8个页面时,则从一致扩展中分配数据页面。 空间页面专门负责数据空间的分配和管理,包括:PFS页面(Page free space):记录一个页面是否已分配、位于混合扩展还是一致扩展以及页面上还有多少可用空间等信息;GAM页面(Global allocation map)和SGAM页面(Secodary global allocation map):用来记录空闲的扩展或含有空闲页面的混合扩展的位置。SQLS综合利用这三种类型的页面文件在必要时为数据表创建新空间; 数据页或索引页则专门保存数据及索引信息,SQLS使用4种类型的数据页面来管理表或索引:它们是IAM页、数据页、文本/图像页和索引页。 在WINDOWS中,我们对文件执行的每一步操作,在磁盘上的物理位置只有系统(system)才知道;SQL SERVER沿袭了这种工作方式,在插入数据的过程中,不但每个字段值在数据页面中的保存位置是随机的,而且每个数据页面在“堆”中的排列位置也只有系统(system)才知道。 这是为什么呢?众所周知,OS之所以能管理DISK,是因为在系统启动时首先加载了文件分配表:FAT(File Allocation Table),正是由它管理文件系统并记录对文件的一切操作,系统才得以正常运行;同理,作为管理系统级的SQL SERVER,也有这样一张类似FAT的表存在,它就是索引分布映像页:IAM(Index Allocation Map)。 IAM的存在,使SQLS对数据表的物理管理有了可能。 IAM页从混合扩展中分配,记录了8个初始页面的位置和该扩展区的位置,每个IAM页面能管理512,000个数据页面,如果数据量太大,SQLS也可以增加更多的IAM页,可以位于文件的任何位置。第一个IAM页被称为FirstIAM,其中记录了以后的IAM页的位置。 数据页和文本/图像页互反,前者保存非文本/图像类型的数据,因为它们都不超过8K的容量,后者则只保存超过8K容量的文本或图像类型数据。而索引页顾名思义,保存的是与索引结构相关的数据信息。了解页面的问题有助我们下一步准确理解SQLS维护索引的方式,如页拆分、填充因子等。 二、索引的基本概念 索引是一种特殊类型的数据库对象,它与表有着密切的联系。 索引是为检索而存在的。如一些书籍的末尾就专门附有索引,指明了某个关键字在正文中的出现的页码位置,方便我们查找,但大多数的书籍只有目录,目录不是索引,只是书中内容的排序,并不提供真正的检索功能。可见建立索引要单独占用空间;索引也并不是必须要建立的,它们只是为更好、更快的检索和定位关键字而存在。 再进一步说,我们要在图书馆中查阅图书,该怎么办呢?图书馆的前台有很多叫做索引卡片柜的小柜子,里面分了若干的类别供我们检索图书,比如你可以用书名的笔画顺序或者拼音顺序作为查找的依据,你还可以从作者名的笔画顺序或拼音顺序去查询想要的图书,反正有许多检索方式,但有一点很明白,书库中的书并没有按照这些卡片柜中的顺序排列——虽然理论上可以这样做,事实上,所有图书的脊背上都人工的粘贴了一个特定的编号①,它们是以这个顺序在排列。索引卡片中并没有指明这本书摆放在书库中的第几个书架的第几本,仅仅指明了这个特定的编号。管理员则根据这一编号将请求的图书返回到读者手中。这是很形象的例子,以下的讲解将会反复用到它。 SQLS在安装完成之后,安装程序会自动创建master、model、tempdb等几个特殊的系统数据库,其中master是SQLS的主数据库,用于保存和管理其它系统数据库、用户数据库以及SQLS的系统信息,它在SQLS中的地位与WINDOWS下的注册表相当。 master中有一个名为sysindexes的系统表,专门管理索引。SQLS查询数据表的操作都必须用到它,毫无疑义,它是本文主角之一。 查看一张表的索引属性,可以在查询分析器中使用以下命令:select from sysindexes where id=object_id(‘tablename’) ;而要查看表的索引所占空间的大小,可以使用系统存储过程命令:sp_spaceused tablename,其中参数tablename为被索引的表名。 三、平衡树 如果你通过书后的索引知道了一个关键字所在的页码,你有可能通过随机的翻寻,最终到达正确的页码。但更科学更快捷的方法是:首先把书翻到大概二分之一的位置,如果要找的页码比该页的页码小,就把书向前翻到四分之一处,否则,就把书向后翻到四分之三的地方,依此类推,把书页续分成更小的部分,直至正确的页码。这叫“两分法”,微软在官方教程MOC里另有一种说法:叫B树(B-Tree,Balance Tree),即平衡树。 一个表索引由若干页面组成,这些页面构成了一个树形结构。B树由“根”(root)开始,称为根级节点,它通过指向另外两个页,把一个表的记录从逻辑上分成两个部分:“枝”—--非叶级节点(Non-Leaf Level);而非叶级节点又分别指向更小的部分:“叶”——叶级节点(Leaf Level)。根节点、非叶级节点和叶级节点都位于索引页中,统称为索引节点,属于索引页的范筹。这些“枝”、“叶”最终指向了具体的数据页(Page)。在根级节点和叶级节点之间的叶又叫数据中间页。 “根”(root)对应了sysindexes表的Root字段,其中记载了非叶级节点的物理位置(即指针);非叶级节点位于根节点和叶节点之间,记载了指向叶级节点的指针;而叶级节点则最终指向数据页。这就是“平衡树”。 四、聚集索引和非聚集索引 从形式上而言,索引分为聚集索引(Clustered Indexes)和非聚集索引(NonClustered Indexes)。 聚集索引相当于书籍脊背上那个特定的编号。如果对一张表建立了聚集索引,其索引页中就包含着建立索引的列的值(下称索引键值),那么表中的记录将按照该索引键值进行排序。比如,我们如果在“姓名”这一字段上建立了聚集索引,则表中的记录将按照姓名进行排列;如果建立了聚集索引的列是数值类型的,那么记录将按照该键值的数值大小来进行排列。 非聚集索引用于指定数据的逻辑顺序,也就是说,表中的数据并没有按照索引键值指定的顺序排列,而仍然按照插入记录时的顺序存放。其索引页中包含着索引键值和它所指向该行记录在数据页中的物理位置,叫做行定位符(RID:Row ID)。好似书后面的的索引表,索引表中的顺序与实际的页码顺序也是不一致的。而且一本书也许有多个索引。比如主题索引和作者索引。 SQL Server在默认的情况下建立的索引是非聚集索引,由于非聚集索引不对表中的数据进行重组,而只是存储索引键值并用一个指针指向数据所在的页面。一个表如果没有聚集索引时,理论上可以建立249个非聚集索引。每个非聚集索引提供访问数据的不同排序顺序。 五、数据是怎样被访问的 若能真正理解了以上索引的基础知识,那么再回头来看索引的工作原理就简单和轻松多了。 (一)SQLS怎样访问没有建立任何索引数据表: Heap译成汉语叫做“堆”,其本义暗含杂乱无章、无序的意思,前面提到数据值被写进数据页时,由于每一行记录之间并没地有特定的排列顺序,所以行与行的顺序就是随机无序的,当然表中的数据页也就是无序的了,而表中所有数据页就形成了“堆”,可以说,一张没有索引的数据表,就像一个只有书柜而没有索引卡片柜的图书馆,书库里面塞满了一堆乱七八糟的图书。当读者对管理员提交查询请求后,管理员就一头钻进书库,对照查找内容从头开始一架一柜的逐本查找,运气好的话,在第一个书架的第一本书就找到了,运气不好的话,要到最后一个书架的最后一本书才找到。 SQLS在接到查询请求的时候,首先会分析sysindexes表中一个叫做索引标志符(INDID: Index ID)的字段的值,如果该值为0,表示这是一张数据表而不是索引表,SQLS就会使用sysindexes表的另一个字段——也就是在前面提到过的FirstIAM值中找到该表的IAM页链——也就是所有数据页集合。 这就是对一个没有建立索引的数据表进行数据查找的方式,是不是很没效率?对于没有索引的表,对于一“堆”这样的记录,SQLS也只能这样做,而且更没劲的是,即使在第一行就找到了被查询的记录,SQLS仍然要从头到尾的将表扫描一次。这种查询称为“遍历”,又叫“表扫描”。 可见没有建立索引的数据表照样可以运行,不过这种方法对于小规模的表来说没有什么太大的问题,但要查询海量的数据效率就太低了。 (二)SQLS怎样访问建立了非聚集索引的数据表: 如前所述,非聚集索引可以建多个,具有B树结构,其叶级节点不包含数据页,只包含索引行。假定一个表中只有非聚集索引,则每个索引行包含了非聚集索引键值以及行定位符(ROW ID,RID),他们指向具有该键值的数据行。每一个RID由文件ID、页编号和在页中行的编号组成。 当INDID的值在2-250之间时,意味着表中存在非聚集索引页。此时,SQLS调用ROOT字段的值指向非聚集索引B树的ROOT,在其中查找与被查询最相近的值,根据这个值找到在非叶级节点中的页号,然后顺藤摸瓜,在叶级节点相应的页面中找到该值的RID,最后根据这个RID在Heap中定位所在的页和行并返回到查询端。 例如:假定在Lastname上建立了非聚集索引,则执行Select From Member Where Lastname=’Ota’时,查询过程是:①SQLS查询INDID值为2;②立即从根出发,在非叶级节点中定位最接近Ota的值“Martin”,并查到其位于叶级页面的第61页;③仅在叶级页面的第61页的Martin下搜寻Ota的RID,其RID显示为N∶706∶4,表示Lastname字段中名为Ota的记录位于堆的第707页的第4行,N表示文件的ID值,与数据无关;④根据上述信息,SQLS立马在堆的第 707页第4行将该记录“揪”出来并显示于前台(客户端)。视表的数据量大小,整个查询过程费时从百分之几毫秒到数毫秒不等。 在谈到索引基本概念的时候,我们就提到了这种方式: 图书馆的前台有很多索引卡片柜,里面分了若干的类别,诸如按照书名笔画或拼音顺序、作者笔画或拼音顺序等等,但不同之处有二:① 索引卡片上记录了每本书摆放的具体位置——位于某柜某架的第几本——而不是“特殊编号”;② 书脊上并没有那个“特殊编号”。管理员在索引柜中查到所需图书的具体位置(RID)后,根据RID直接在书库中的具体位置将书提出来。 显然,这种查询方式效率很高,但资源占用极大,因为书库中书的位置随时在发生变化,必然要求管理员花费额外的精力和时间随时做好索引更新。 (三)SQLS怎样访问建立了聚集索引的数据表: 在聚集索引中,数据所在的数据页是叶级,索引数据所在的索引页是非叶级。 查询原理和上述对非聚集索引的查询相似,但由于记录是按照聚集索引中索引键值进行排序,换句话说,聚集索引的索引键值也就是具体的数据页。 这就好比书库中的书就是按照书名的拼音在排序,而且也只按照这一种排序方式建立相应的索引卡片,于是查询起来要比上述只建立非聚集索引的方式要简单得多。仍以上面的查询为例: 假定在Lastname字段上建立了聚集索引,则执行Select From Member Where Lastname=’Ota’时,查询过程是:①SQLS查询INDID值为1,这是在系统中只建立了聚集索引的标志;②立即从根出发,在非叶级节点中定位最接近Ota的值“Martin”,并查到其位于叶级页面的第120页;③在位于叶级页面第120页的Martin下搜寻到Ota条目,而这一条目已是数据记录本身;④将该记录返回客户端。 这一次的效率比第二种方法更高,以致于看起来更美,然而它最大的优点也恰好是它最大的缺点——由于同一张表中同时只能按照一种顺序排列,所以在任何一种数据表中的聚集索引只能建立一个;并且建立聚集索引需要至少相当于源表120%的附加空间,以存放源表的副本和索引中间页! 难道鱼和熊掌就不能兼顾了吗?办法是有的。 (四)SQLS怎样访问既有聚集索引、又有非聚集索引的数据表: 如果我们在建立非聚集索引之前先建立了聚集索引的话,那么非聚集索引就可以使用聚集索引的关键字进行检索,就像在图书馆中,前台卡片柜中的可以有不同类别的图书索引卡,然而每张卡片上都载明了那个特殊编号——并不是书籍存放的具体位置。这样在最大程度上既照顾了数据检索的快捷性,又使索引的日常维护变得更加可行,这是最为科学的检索方法。 也就是说,在只建立了非聚集索引的情况下,每个叶级节点指明了记录的行定位符(RID);而在既有聚集索引又有非聚集索引的情况下,每个叶级节点所指向的是该聚集索引的索引键值,即数据记录本身。 假设聚集索引建立在Lastname上,而非聚集索引建立在Firstname上,当执行Select From Member Where Firstname=’Mike’时,查询过程是:①SQLS查询INDID值为2;②立即从根出发,在Firstname的非聚集索引的非叶级节点中定位最接近Mike的值“Jose”条目;③从Jose条目下的叶级页面中查到Mike逻辑位置——不是RID而是聚集索引的指针;④根据这一指针所指示位置,直接进入位于Lastname的聚集索引中的叶级页面中到达Mike数据记录本身;⑤将该记录返回客户端。 这就完全和我们在“索引的基本概念”中讲到的现实场景完全一样了,当数据发生更新的时候,SQLS只负责对聚集索引的健值驾以维护,而不必考虑非聚集索引,只要我们在ID类的字段上建立聚集索引,而在其它经常需要查询的字段上建立非聚集索引,通过这种科学的、有针对性的在一张表上分别建立聚集索引和非聚集索引的方法,我们既享受了索引带来的灵活与快捷,又相对规避了维护索引所导致的大量的额外资源消耗。 六、索引的优点和不足 索引有一些先天不足:1:建立索引,系统要占用大约为表的1.2倍的硬盘和内存空间来保存索引。2:更新数据的时候,系统必须要有额外的时间来同时对索引进行更新,以维持数据和索引的一致性——这就如同图书馆要有专门的位置来摆放索引柜,并且每当库存图书发生变化时都需要有人将索引卡片重整以保持索引与库存的一致。 当然建立索引的优点也是显而易见的:在海量数据的情况下,如果合理的建立了索引,则会大大加强SQLS执行查询、对结果进行排序、分组的操作效率。 实践表明,不恰当的索引不但于事无补,反而会降低系统性能。因为大量的索引在进行插入、修改和删除操作时比没有索引花费更多的系统时间。比如在如下字段建立索引应该是不恰当的:1、很少或从不引用的字段;2、逻辑型的字段,如男或女(是或否)等。 综上所述,提高查询效率是以消耗一定的系统资源为代价的,索引不能盲目的建立,必须要有统筹的规划,一定要在“加快查询速度”与“降低修改速度”之间做好平衡,有得必有失,此消则彼长。这是考验一个DBA是否优秀的很重要的指标。 至此,我们一直在说SQLS在维护索引时要消耗系统资源,那么SQLS维护索引时究竟消耗了什么资源?会产生哪些问题?究竟应该才能优化字段的索引? 在上篇中,我们就索引的基本概念和数据查询原理作了详细阐述,知道了建立索引时一定要在“加快查询速度”与“降低修改速度”之间做好平衡,有得必有失,此消则彼长。那么,SQLS维护索引时究竟怎样消耗资源?应该从哪些方面对索引进行管理与优化?以下就从七个方面来回答这些问题。 一、页分裂 微软MOC教导我们:当一个数据页达到了8K容量,如果此时发生插入或更新数据的操作,将导致页的分裂(又名页拆分): 1、有聚集索引的情况下:聚集索引将被插入和更新的行指向特定的页,该页由聚集索引关键字决定; 2、只有堆的情况下:只要有空间就可以插入新的行,但是如果我们对行数据的更新需要更多的空间,以致大于了当前页的可用空间,行就被移到新的页中,并且在原位置留下一个转发指针,指向被移动的新行,如果具有转发指针的行又被移动了,那么原来的指针将重新指向新的位置; 3、如果堆中有非聚集索引,那么尽管插入和更新操作在堆中不会发生页分裂,但是在非聚集索引上仍然产生页分裂。 无论有无索引,大约一半的数据将保留在老页面,而另一半将放入新页面,并且新页面可能被分配到任何可用的页。所以,频繁页分裂,后果很严重,将使物理表产生大量数据碎片,导致直接造成I/O效率的急剧下降,最后,停止SQLS的运行并重建索引将是我们的唯一选择! 二、填充因子 然而在“混沌之初”,就可以在一定程度上避免不愉快出现:在创建索引时,可以为这个索引指定一个填充因子,以便在索引的每个叶级页面上保留一定百分比的空间,将来数据可以进行扩充和减少页分裂。填充因子是从0到100的百分比数值,设为100时表示将数据页填满。只有当不会对数据进行更改时(例如只读表中)才用此设置。值越小则数据页上的空闲空间越大,这样可以减少在索引增长过程中进行页分裂的需要,但这一操作需要占用更多的硬盘空间。 填充因子只在创建索引时执行,索引创建以后,当表中进行数据的添加、删除或更新时,是不会保持填充因子的,如果想在数据页上保持额外的空间,则有悖于使用填充因子的本意,因为随着数据的输入,SQLS必须在每个页上进行页拆分,以保持填充因子指定的空闲空间。因此,只有在表中的数据进行了较大的变动,才可以填充数据页的空闲空间。这时,可以从容的重建索引,重新指定填充因子,重新分布数据。 反之,填充因子指定不当,就会降低数据库的读取性能,其降低量与填充因子设置值成反比。例如,当填充因子的值为50时,数据库的读取性能会降低两倍!所以,只有在表中根据现有数据创建新索引,并且可以预见将来会对这些数据进行哪些更改时,设置填充因子才有意义。 三、两道数学题 假定数据库设计没有问题,那么是否象上篇中分析的那样,当你建立了众多的索引,在查询工作中SQLS就只能按照“最高指示”用索引处理每一个提交的查询呢?答案是否定的! 上篇“数据是怎样被访问的”章节中提到的四种索引方案只是一种静态的、标准的和理论上的分析比较,实际上,将在外,军令有所不从,SQLS几乎完全是“自主”的决定是否使用索引或使用哪一个索引! 这是怎么回事呢? 让我们先来算一道题:如果某表的一条记录在磁盘上占用1000字节(1K)的话,我们对其中10字节的一个字段建立索引,那么该记录对应的索引大小只有10字节(0.01K)。上篇说过,SQLS的最小空间分配单元是“页(Page)”,一个页面在磁盘上占用8K空间,所以一页只能存储8条“记录”,但可以存储800条“索引”。现在我们要从一个有8000条记录的表中检索符合某个条件的记录(有Where子句),如果没有索引的话,我们需要遍历8000条×1000字节/8K字节=1000个页面才能够找到结果。如果在检索字段上有上述索引的话,那么我们可以在8000条×10字节/8K字节=10个页面中就检索到满足条件的索引块,然后根据索引块上的指针逐一找到结果数据块,这样I/O访问量肯定要少得多。 然而有时用索引还不如不用索引快! 同上,如果要无条件检索全部记录(不用Where子句),不用索引的话,需要访问8000条×1000字节/8K字节=1000个页面;而使用索引的话,首先检索索引,访问8000条×10字节/8K字节=10个页面得到索引检索结果,再根据索引检索结果去对应数据页面,由于是检索全部数据,所以需要再访问8000条×1000字节/8K字节=1000个页面将全部数据读取出来,一共访问了1010个页面,这显然不如不用索引快。 SQLS内部有一套完整的数据索引优化技术,在上述情况下,SQLS会自动使用表扫描的方式检索数据而不会使用任何索引。那么SQLS是怎么知道什么时候用索引,什么时候不用索引的呢?因为SQLS除了维护数据信息外,还维护着数据统计信息! 四、统计信息 打开企业管理器,单击“Database”节点,右击Northwind数据库→单击“属性”→选择“Options”选项卡,观察“Settings”下的各项复选项,你发现了什么? 从Settings中我们可以看到,在数据库中,SQLS将默认的自动创建和更新统计信息,这些统计信息包括数据密度和分布信息,正是它们帮助SQLS确定最佳的查询策略:建立查询计划和是否使用索引以及使用什么样的索引。 在创建索引时,SQLS会创建分布数据页来存放有关索引的两种统计信息:分布表和密度表。查询优化器使用这些统计信息估算使用该索引进行查询的成本(Cost),并在此基础上判断该索引对某个特定查询是否有用。 随着表中的数据发生变化,SQLS自动定期更新这些统计信息。采样是在各个数据页上随机进行。从磁盘读取一个数据页后,该数据页上的所有行都被用来更新统计信息。统计信息更新的频率取决于字段或索引中的数据量以及数据更改量。比如,对于有一万条记录的表,当1000个索引键值发生改变时,该表的统计信息便可能需要更新,因为1000 个值在该表中占了10%,这是一个很大的比例。而对于有1千万条记录的表来说,1000个索引值发生更改的意义则可以忽略不计,因此统计信息就不会自动更新。 至于它们帮助SQLS建立查询计划的具体过程,限于篇幅,这里就省略了,请有兴趣的朋友们自己研究。 顺便多说一句,SQLS除了能自动记录统计信息之外,还可以记录服务器中所发生的其它活动的详细信息,包括I/O 统计信息、CPU 统计信息、锁定请求、T-SQL 和 RPC 统计信息、索引和表扫描、警告和引发的错误、数据库对象的创建/除去、连接/断开、存储过程操作、游标操作等等。这些信息的读取、设置请朋友们在SQLS联机帮助文档(SQL Server Books Online)中搜索字符串“Profiler”查找。 五、索引的人工维护 上面讲到,某些不合适的索引将影响到SQLS的性能,随着应用系统的运行,数据不断地发生变化,当数据变化达到某一个程度时将会影响到索引的使用。这时需要用户自己来维护索引。 随着数据行的插入、删除和数据页的分裂,有些索引页可能只包含几页数据,另外应用在执行大量I/O的时候,重建非聚聚集索引可以维护I/O的效率。重建索引实质上是重新组织B树。需要重建索引的情况有: 1) 数据和使用模式大幅度变化; 2)排序的顺序发生改变; 3)要进行大量插入操作或已经完成; 4)使用I/O查询的磁盘读次数比预料的要多; 5)由于大量数据修改,使得数据页和索引页没有充分使用而导致空间的使用超出估算; 6)dbcc检查出索引有问题。 六、索引的使用原则 接近尾声的时候,让我们再从另一个角度认识索引的两个重要属性----唯一性索引和复合性索引。 在设计表的时候,可以对字段值进行某些限制,比如可以对字段进行主键约束或唯一性约束。 主键约束是指定某个或多个字段不允许重复,用于防止表中出现两条完全相同的记录,这样的字段称为主键,每张表都可以建立并且只能建立一个主键,构成主键的字段不允许空值。例如职员表中“身份证号”字段或成绩表中“学号、课程编号”字段组合。 而唯一性约束与主键约束类似,区别只在于构成唯一性约束的字段允许出现空值。 建立在主键约束和唯一性约束上的索引,由于其字段值具有唯一性,于是我们将这种索引叫做“唯一性索引”,如果这个唯一性索引是由两个以上字段的组合建立的,那么它又叫“复合性索引”。 注意,唯一索引不是聚集索引,如果对一个字段建立了唯一索引,你仅仅不能向这个字段输入重复的值。并不妨碍你可以对其它类型的字段也建立一个唯一性索引,它们可以是聚集的,也可以是非聚集的。 唯一性索引保证在索引列中的全部数据是唯一的,不会包含冗余数据。如果表中已经有一个主键约束或者唯一性约束,那么当创建表或者修改表时,SQLS自动创建一个唯一性索引。但出于必须保证唯一性,那么应该创建主键约束或者唯一性键约束,而不是创建一个唯一性索引。当创建唯一性索引时,应该认真考虑这些规则:当在表中创建主键约束或者唯一性键约束时, SQLS钭自动创建一个唯一性索引;如果表中已经包含有数据,那么当创建索引时,SQLS检查表中已有数据的冗余性,如果发现冗余值,那么SQLS就取消该语句的执行,并且返回一个错误消息,确保表中的每一行数据都有一个唯一值。 复合索引就是一个索引创建在两个列或者多个列上。在搜索时,当两个或者多个列作为一个关键值时,最好在这些列上创建复合索引。当创建复合索引时,应该考虑这些规则:最多可以把16个列合并成一个单独的复合索引,构成复合索引的列的总长度不能超过900字节,也就是说复合列的长度不能太长;在复合索引中,所有的列必须来自同一个表中,不能跨表建立复合列;在复合索引中,列的排列顺序是非常重要的,原则上,应该首先定义最唯一的列,例如在(COL1,COL2)上的索引与在(COL2,COL1)上的索引是不相同的,因为两个索引的列的顺序不同;为了使查询优化器使用复合索引,查询语句中的WHERE子句必须参考复合索引中第一个列;当表中有多个关键列时,复合索引是非常有用的;使用复合索引可以提高查询性能,减少在一个表中所创建的索引数量。 综上所述,我们总结了如下索引使用原则: 1)逻辑主键使用唯一的成组索引,对系统键(作为存储过程)采用唯一的非成组索引,对任何外键列采用非成组索引。考虑数据库的空间有多大,表如何进行访问,还有这些访问是否主要用作读写。 2)不要索引memo/note 字段,不要索引大型字段(有很多字符),这样作会让索引占用太多的存储空间。 3)不要索引常用的小型表 4)一般不要为小型数据表设置过多的索引,假如它们经常有插入和删除操作就更别这样作了,SQLS对这些插入和删除操作提供的索引维护可能比扫描表空间消耗更多的时间。 七、大结局 查询是一个物理过程,表面上是SQLS在东跑西跑,其实真正大部分压马路的工作是由磁盘输入输出系统(I/O)完成,全表扫描需要从磁盘上读表的每一个数据页,如果有索引指向数据值,则I/O读几次磁盘就可以了。但是,在随时发生的增、删、改操作中,索引的存在会大大增加工作量,因此,合理的索引设计是建立在对各种查询的分析和预测上的,只有正确地使索引与程序结合起来,才能产生最佳的优化方案。 一般来说建立索引的思路是: (1)主键时常作为where子句的条件,应在表的主键列上建立聚聚集索引,尤其当经常用它作为连接的时候。 (2)有大量重复值且经常有范围查询和排序、分组发生的列,或者非常频繁地被访问的列,可考虑建立聚聚集索引。 (3)经常同时存取多列,且每列都含有重复值可考虑建立复合索引来覆盖一个或一组查询,并把查询引用最频繁的列作为前导列,如果可能尽量使关键查询形成覆盖查询。 (4)如果知道索引键的所有值都是唯一的,那么确保把索引定义成唯一索引。 (5)在一个经常做插入操作的表上建索引时,使用fillfactor(填充因子)来减少页分裂,同时提高并发度降低死锁的发生。如果在只读表上建索引,则可以把fillfactor置为100。 (6)在选择索引字段时,尽量选择那些小数据类型的字段作为索引键,以使每个索引页能够容纳尽可能多的索引键和指针,通过这种方式,可使一个查询必须遍历的索引页面降到最小。此外,尽可能地使用整数为键值,因为它能够提供比任何数据类型都快的访问速度。 SQLS是一个很复杂的系统,让索引以及查询背后的东西真相大白,可以帮助我们更为深刻的了解我们的系统。一句话,索引就象盐,少则无味多则咸。 本篇文章为转载内容。原文链接:https://blog.csdn.net/qq_28052907/article/details/75194926。 该文由互联网用户投稿提供,文中观点代表作者本人意见,并不代表本站的立场。 作为信息平台,本站仅提供文章转载服务,并不拥有其所有权,也不对文章内容的真实性、准确性和合法性承担责任。 如发现本文存在侵权、违法、违规或事实不符的情况,请及时联系我们,我们将第一时间进行核实并删除相应内容。
2023-04-30 23:10:07
97
转载
MySQL
MySQL 是一款普遍的关联型DBMS,我们可以通过一些基础操作来看 MySQL 中的表格基本信息。 首先,我们需要登陆到 MySQL 数据库中。下面是用命令行方式登陆 MySQL 的示例脚本: mysql -h host -u user -p 其中, host 表示 MySQL 数据库服务器 IP 地址或服务器名称, user 是接入 MySQL 数据库所使用的账户名, -p 是要求输入口令,用于校验登陆。 接着,我们可以查看 MySQL 中当前所有的数据库,使用 的命令为: SHOW DATABASES; 然后,我们需要选择一个数据库进行操作,使用的命令为: USE database; 其中, database 表示需要操作的数据库名称。 现在,我们已经成功进入到了 MySQL 数据库中,可以看到库中的表格基本信息,使用命令如下: SHOW TABLES; 执行该命令后,MySQL 将会显示该库中所有的表名称。 最后,我们可以查看特定表中的所有信息,使用的命令如下: DESCRIBE table; 其中, table 表示需要查看的表名称。 通过上述基础操作,我们可以轻松地了解 MySQL 数据库中的基础表信息。
2023-08-18 09:15:20
63
算法侠
MySQL
MySQL 是当前广泛应用的关系型数据库管理系统软件。如果你须要在个人的计算机中开展 MySQL 的开发任务,那么首先要保证计算机中已经装有 MySQL。下面我们就来看一看如何确认电脑是否装有了 MySQL。 1. 查看是否装有了 MySQL 客户端mysql--version MySQL 客户端是接入 MySQL 服务端的软件,如果你没有装有 MySQL 客户端,那么你将无法接入到 MySQL 服务端。在命令行中输入上面的指令,如果系统提示找不到该命令,则说明你还没有装有 MySQL 客户端。 2. 查看是否装有了 MySQL 服务端mysql-u root -p -h localhost MySQL 服务端是 MySQL 数据库的关键,如果你没有装有 MySQL 服务端,那么你将无法利用 MySQL。在命令行中输入上面的指令,如果系统提示找不到该命令,则说明你还没有装有 MySQL 服务端。 3. 查看是否装有了 MySQL 的 Python 插件包import pymysql;print(pymysql.__version__) 对于 Python 开发者来说,他们须要在电脑中装有 MySQL 的 Python 插件包,才能在个人的 Python 项目中利用 MySQL。在 Python 命令行中输入上面的指令,如果系统提示找不到该模块,则说明你还没有装有 MySQL 的 Python 插件包。 通过上述三个步骤,你就可确认出个人的电脑是否已经装有 MySQL。若未装有,可以到 MySQL 的官网上下载相应的软件,并按照提示开展装有。
2023-04-24 15:12:40
49
电脑达人
转载文章
...ket往后台发送日志数据,在这里我们是要做基于SparkStreaming做实时在线统计。那么数据就需要放进消息系统(Kafka)中,我们的Spark Streaming应用程序就会去Kafka中Pull数据过来进行计算和消费,并把计算后的数据放入到持久化系统中(MySQL) 广告点击系统实时分析的意义:因为可以在线实时的看见广告的投放效果,就为广告的更大规模的投入和调整打下了坚实的基础,从而为公司带来最大化的经济回报。 核心需求: 1、实时黑名单动态过滤出有效的用户广告点击行为:因为黑名单用户可能随时出现,所以需要动态更新; 2、在线计算广告点击流量; 3、Top3热门广告; 4、每个广告流量趋势; 5、广告点击用户的区域分布分析 6、最近一分钟的广告点击量; 7、整个广告点击Spark Streaming处理程序724小时运行; 数据格式: 时间、用户、广告、城市等 技术细节: 在线计算用户点击的次数分析,屏蔽IP等; 使用updateStateByKey或者mapWithState进行不同地区广告点击排名的计算; Spark Streaming+Spark SQL+Spark Core等综合分析数据; 使用Window类型的操作; 高可用和性能调优等等; 流量趋势,一般会结合DB等; Spark Core / /package com.tom.spark.SparkApps.sparkstreaming;import java.util.Date;import java.util.HashMap;import java.util.Map;import java.util.Properties;import java.util.Random;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;/ 数据生成代码,Kafka Producer产生数据/public class MockAdClickedStat {/ @param args/public static void main(String[] args) {final Random random = new Random();final String[] provinces = new String[]{"Guangdong", "Zhejiang", "Jiangsu", "Fujian"};final Map<String, String[]> cities = new HashMap<String, String[]>();cities.put("Guangdong", new String[]{"Guangzhou", "Shenzhen", "Dongguan"});cities.put("Zhejiang", new String[]{"Hangzhou", "Wenzhou", "Ningbo"});cities.put("Jiangsu", new String[]{"Nanjing", "Suzhou", "Wuxi"});cities.put("Fujian", new String[]{"Fuzhou", "Xiamen", "Sanming"});final String[] ips = new String[] {"192.168.112.240","192.168.112.239","192.168.112.245","192.168.112.246","192.168.112.247","192.168.112.248","192.168.112.249","192.168.112.250","192.168.112.251","192.168.112.252","192.168.112.253","192.168.112.254",};/ Kafka相关的基本配置信息/Properties kafkaConf = new Properties();kafkaConf.put("serializer.class", "kafka.serializer.StringEncoder");kafkaConf.put("metadeta.broker.list", "Master:9092,Worker1:9092,Worker2:9092");ProducerConfig producerConfig = new ProducerConfig(kafkaConf);final Producer<Integer, String> producer = new Producer<Integer, String>(producerConfig);new Thread(new Runnable() {public void run() {while(true) {//在线处理广告点击流的基本数据格式:timestamp、ip、userID、adID、province、cityLong timestamp = new Date().getTime();String ip = ips[random.nextInt(12)]; //可以采用网络上免费提供的ip库int userID = random.nextInt(10000);int adID = random.nextInt(100);String province = provinces[random.nextInt(4)];String city = cities.get(province)[random.nextInt(3)];String clickedAd = timestamp + "\t" + ip + "\t" + userID + "\t" + adID + "\t" + province + "\t" + city;producer.send(new KeyedMessage<Integer, String>("AdClicked", clickedAd));try {Thread.sleep(50);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} }} }).start();} } package com.tom.spark.SparkApps.sparkstreaming;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.util.ArrayList;import java.util.Arrays;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Set;import java.util.concurrent.LinkedBlockingQueue;import kafka.serializer.StringDecoder;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.sql.DataFrame;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.hive.HiveContext;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructType;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaPairInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;import org.apache.spark.streaming.kafka.KafkaUtils;import com.google.common.base.Optional;import scala.Tuple2;/ 数据处理,Kafka消费者/public class AdClickedStreamingStats {/ @param args/public static void main(String[] args) {// TODO Auto-generated method stub//好处:1、checkpoint 2、工厂final SparkConf conf = new SparkConf().setAppName("SparkStreamingOnKafkaDirect").setMaster("hdfs://Master:7077/");final String checkpointDirectory = "hdfs://Master:9000/library/SparkStreaming/CheckPoint_Data";JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {public JavaStreamingContext create() {// TODO Auto-generated method stubreturn createContext(checkpointDirectory, conf);} };/ 可以从失败中恢复Driver,不过还需要指定Driver这个进程运行在Cluster,并且在提交应用程序的时候制定--supervise;/JavaStreamingContext javassc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);/ 第三步:创建Spark Streaming输入数据来源input Stream: 1、数据输入来源可以基于File、HDFS、Flume、Kafka、Socket等 2、在这里我们指定数据来源于网络Socket端口,Spark Streaming连接上该端口并在运行的时候一直监听该端口的数据 (当然该端口服务首先必须存在),并且在后续会根据业务需要不断有数据产生(当然对于Spark Streaming 应用程序的运行而言,有无数据其处理流程都是一样的) 3、如果经常在每间隔5秒钟没有数据的话不断启动空的Job其实会造成调度资源的浪费,因为并没有数据需要发生计算;所以 实际的企业级生成环境的代码在具体提交Job前会判断是否有数据,如果没有的话就不再提交Job;///创建Kafka元数据来让Spark Streaming这个Kafka Consumer利用Map<String, String> kafkaParameters = new HashMap<String, String>();kafkaParameters.put("metadata.broker.list", "Master:9092,Worker1:9092,Worker2:9092");Set<String> topics = new HashSet<String>();topics.add("SparkStreamingDirected");JavaPairInputDStream<String, String> adClickedStreaming = KafkaUtils.createDirectStream(javassc, String.class, String.class, StringDecoder.class, StringDecoder.class,kafkaParameters, topics);/因为要对黑名单进行过滤,而数据是在RDD中的,所以必然使用transform这个函数; 但是在这里我们必须使用transformToPair,原因是读取进来的Kafka的数据是Pair<String,String>类型, 另一个原因是过滤后的数据要进行进一步处理,所以必须是读进的Kafka数据的原始类型 在此再次说明,每个Batch Duration中实际上讲输入的数据就是被一个且仅被一个RDD封装的,你可以有多个 InputDStream,但其实在产生job的时候,这些不同的InputDStream在Batch Duration中就相当于Spark基于HDFS 数据操作的不同文件来源而已罢了。/JavaPairDStream<String, String> filteredadClickedStreaming = adClickedStreaming.transformToPair(new Function<JavaPairRDD<String,String>, JavaPairRDD<String,String>>() {public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {/ 在线黑名单过滤思路步骤: 1、从数据库中获取黑名单转换成RDD,即新的RDD实例封装黑名单数据; 2、然后把代表黑名单的RDD的实例和Batch Duration产生的RDD进行Join操作, 准确的说是进行leftOuterJoin操作,也就是说使用Batch Duration产生的RDD和代表黑名单的RDD实例进行 leftOuterJoin操作,如果两者都有内容的话,就会是true,否则的话就是false 我们要留下的是leftOuterJoin结果为false; /final List<String> blackListNames = new ArrayList<String>();JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();jdbcWrapper.doQuery("SELECT FROM blacklisttable", null, new ExecuteCallBack() {public void resultCallBack(ResultSet result) throws Exception {while(result.next()){blackListNames.add(result.getString(1));} }});List<Tuple2<String, Boolean>> blackListTuple = new ArrayList<Tuple2<String,Boolean>>();for(String name : blackListNames) {blackListTuple.add(new Tuple2<String, Boolean>(name, true));}List<Tuple2<String, Boolean>> blacklistFromListDB = blackListTuple; //数据来自于查询的黑名单表并且映射成为<String, Boolean>JavaSparkContext jsc = new JavaSparkContext(rdd.context());/ 黑名单的表中只有userID,但是如果要进行join操作的话就必须是Key-Value,所以在这里我们需要 基于数据表中的数据产生Key-Value类型的数据集合/JavaPairRDD<String, Boolean> blackListRDD = jsc.parallelizePairs(blacklistFromListDB);/ 进行操作的时候肯定是基于userID进行join,所以必须把传入的rdd进行mapToPair操作转化成为符合格式的RDD/JavaPairRDD<String, Tuple2<String, String>> rdd2Pair = rdd.mapToPair(new PairFunction<Tuple2<String,String>, String, Tuple2<String, String>>() {public Tuple2<String, Tuple2<String, String>> call(Tuple2<String, String> t) throws Exception {// TODO Auto-generated method stubString userID = t._2.split("\t")[2];return new Tuple2<String, Tuple2<String,String>>(userID, t);} });JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> joined = rdd2Pair.leftOuterJoin(blackListRDD);JavaPairRDD<String, String> result = joined.filter(new Function<Tuple2<String,Tuple2<Tuple2<String,String>,Optional<Boolean>>>, Boolean>() {public Boolean call(Tuple2<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> tuple)throws Exception {// TODO Auto-generated method stubOptional<Boolean> optional = tuple._2._2;if(optional.isPresent() && optional.get()){return false;} else {return true;} }}).mapToPair(new PairFunction<Tuple2<String,Tuple2<Tuple2<String,String>,Optional<Boolean>>>, String, String>() {public Tuple2<String, String> call(Tuple2<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> t)throws Exception {// TODO Auto-generated method stubreturn t._2._1;} });return result;} });//广告点击的基本数据格式:timestamp、ip、userID、adID、province、cityJavaPairDStream<String, Long> pairs = filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {public Tuple2<String, Long> call(Tuple2<String, String> t) throws Exception {String[] splited=t._2.split("\t");String timestamp = splited[0]; //YYYY-MM-DDString ip = splited[1];String userID = splited[2];String adID = splited[3];String province = splited[4];String city = splited[5]; String clickedRecord = timestamp + "_" +ip + "_"+userID+"_"+adID+"_"+province +"_"+city;return new Tuple2<String, Long>(clickedRecord, 1L);} });/ 第4.3步:在单词实例计数为1基础上,统计每个单词在文件中出现的总次数/JavaPairDStream<String, Long> adClickedUsers= pairs.reduceByKey(new Function2<Long, Long, Long>() {public Long call(Long i1, Long i2) throws Exception{return i1 + i2;} });/判断有效的点击,复杂化的采用机器学习训练模型进行在线过滤 简单的根据ip判断1天不超过100次;也可以通过一个batch duration的点击次数判断是否非法广告点击,通过一个batch来判断是不完整的,还需要一天的数据也可以每一个小时来判断。/JavaPairDStream<String, Long> filterClickedBatch = adClickedUsers.filter(new Function<Tuple2<String,Long>, Boolean>() {public Boolean call(Tuple2<String, Long> v1) throws Exception {if (1 < v1._2){//更新一些黑名单的数据库表return false;} else { return true;} }});//filterClickedBatch.print();//写入数据库filterClickedBatch.foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {public Void call(JavaPairRDD<String, Long> rdd) throws Exception {rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {public void call(Iterator<Tuple2<String, Long>> partition) throws Exception {//使用数据库连接池的高效读写数据库的方式将数据写入数据库mysql//例如一次插入 1000条 records,使用insertBatch 或 updateBatch//插入的用户数据信息:userID,adID,clickedCount,time//这里面有一个问题,可能出现两条记录的key是一样的,此时需要更新累加操作List<UserAdClicked> userAdClickedList = new ArrayList<UserAdClicked>();while(partition.hasNext()) {Tuple2<String, Long> record = partition.next();String[] splited = record._1.split("\t");UserAdClicked userClicked = new UserAdClicked();userClicked.setTimestamp(splited[0]);userClicked.setIp(splited[1]);userClicked.setUserID(splited[2]);userClicked.setAdID(splited[3]);userClicked.setProvince(splited[4]);userClicked.setCity(splited[5]);userAdClickedList.add(userClicked);}final List<UserAdClicked> inserting = new ArrayList<UserAdClicked>();final List<UserAdClicked> updating = new ArrayList<UserAdClicked>();JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();//表的字段timestamp、ip、userID、adID、province、city、clickedCountfor(final UserAdClicked clicked : userAdClickedList) {jdbcWrapper.doQuery("SELECT clickedCount FROM adclicked WHERE"+ " timestamp =? AND userID = ? AND adID = ?",new Object[]{clicked.getTimestamp(), clicked.getUserID(),clicked.getAdID()}, new ExecuteCallBack() {public void resultCallBack(ResultSet result) throws Exception {// TODO Auto-generated method stubif(result.next()) {long count = result.getLong(1);clicked.setClickedCount(count);updating.add(clicked);} else {inserting.add(clicked);clicked.setClickedCount(1L);} }});}//表的字段timestamp、ip、userID、adID、province、city、clickedCountList<Object[]> insertParametersList = new ArrayList<Object[]>();for(UserAdClicked insertRecord : inserting) {insertParametersList.add(new Object[] {insertRecord.getTimestamp(),insertRecord.getIp(),insertRecord.getUserID(),insertRecord.getAdID(),insertRecord.getProvince(),insertRecord.getCity(),insertRecord.getClickedCount()});}jdbcWrapper.doBatch("INSERT INTO adclicked VALUES(?, ?, ?, ?, ?, ?, ?)", insertParametersList);//表的字段timestamp、ip、userID、adID、province、city、clickedCountList<Object[]> updateParametersList = new ArrayList<Object[]>();for(UserAdClicked updateRecord : updating) {updateParametersList.add(new Object[] {updateRecord.getTimestamp(),updateRecord.getIp(),updateRecord.getUserID(),updateRecord.getAdID(),updateRecord.getProvince(),updateRecord.getCity(),updateRecord.getClickedCount() + 1});}jdbcWrapper.doBatch("UPDATE adclicked SET clickedCount = ? WHERE"+ " timestamp =? AND ip = ? AND userID = ? AND adID = ? "+ "AND province = ? AND city = ?", updateParametersList);} });return null;} });//再次过滤,从数据库中读取数据过滤黑名单JavaPairDStream<String, Long> blackListBasedOnHistory = filterClickedBatch.filter(new Function<Tuple2<String,Long>, Boolean>() {public Boolean call(Tuple2<String, Long> v1) throws Exception {//广告点击的基本数据格式:timestamp,ip,userID,adID,province,cityString[] splited = v1._1.split("\t"); //提取key值String date =splited[0];String userID =splited[2];String adID =splited[3];//查询一下数据库同一个用户同一个广告id点击量超过50次列入黑名单//接下来 根据date、userID、adID条件去查询用户点击广告的数据表,获得总的点击次数//这个时候基于点击次数判断是否属于黑名单点击int clickedCountTotalToday = 81 ;if (clickedCountTotalToday > 50) {return true;}else {return false ;} }});//map操作,找出用户的idJavaDStream<String> blackListuserIDBasedInBatchOnhistroy =blackListBasedOnHistory.map(new Function<Tuple2<String,Long>, String>() {public String call(Tuple2<String, Long> v1) throws Exception {// TODO Auto-generated method stubreturn v1._1.split("\t")[2];} });//有一个问题,数据可能重复,在一个partition里面重复,这个好办;//但多个partition不能保证一个用户重复,需要对黑名单的整个rdd进行去重操作。//rdd去重了,partition也就去重了,一石二鸟,一箭双雕// 找出了黑名单,下一步就写入黑名单数据库表中JavaDStream<String> blackListUniqueuserBasedInBatchOnhistroy = blackListuserIDBasedInBatchOnhistroy.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {// TODO Auto-generated method stubreturn rdd.distinct();} });// 下一步写入到数据表中blackListUniqueuserBasedInBatchOnhistroy.foreachRDD(new Function<JavaRDD<String>, Void>() {public Void call(JavaRDD<String> rdd) throws Exception {rdd.foreachPartition(new VoidFunction<Iterator<String>>() {public void call(Iterator<String> t) throws Exception {// TODO Auto-generated method stub//插入的用户信息可以只包含:useID//此时直接插入黑名单数据表即可。//写入数据库List<Object[]> blackList = new ArrayList<Object[]>();while(t.hasNext()) {blackList.add(new Object[]{t.next()});}JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();jdbcWrapper.doBatch("INSERT INTO blacklisttable values (?)", blackList);} });return null;} });/广告点击累计动态更新,每个updateStateByKey都会在Batch Duration的时间间隔的基础上进行广告点击次数的更新, 更新之后我们一般都会持久化到外部存储设备上,在这里我们存储到MySQL数据库中/JavaPairDStream<String, Long> updateStateByKeyDSteam = filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {public Tuple2<String, Long> call(Tuple2<String, String> t)throws Exception {String[] splited=t._2.split("\t");String timestamp = splited[0]; //YYYY-MM-DDString ip = splited[1];String userID = splited[2];String adID = splited[3];String province = splited[4];String city = splited[5]; String clickedRecord = timestamp + "_" +ip + "_"+userID+"_"+adID+"_"+province +"_"+city;return new Tuple2<String, Long>(clickedRecord, 1L);} }).updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() {public Optional<Long> call(List<Long> v1, Optional<Long> v2)throws Exception {// v1:当前的Key在当前的Batch Duration中出现的次数的集合,例如{1,1,1,。。。,1}// v2:当前的Key在以前的Batch Duration中积累下来的结果;Long clickedTotalHistory = 0L; if(v2.isPresent()){clickedTotalHistory = v2.get();}for(Long one : v1) {clickedTotalHistory += one;}return Optional.of(clickedTotalHistory);} });updateStateByKeyDSteam.foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {public Void call(JavaPairRDD<String, Long> rdd) throws Exception {rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {public void call(Iterator<Tuple2<String, Long>> partition) throws Exception {//使用数据库连接池的高效读写数据库的方式将数据写入数据库mysql//例如一次插入 1000条 records,使用insertBatch 或 updateBatch//插入的用户数据信息:timestamp、adID、province、city//这里面有一个问题,可能出现两条记录的key是一样的,此时需要更新累加操作List<AdClicked> AdClickedList = new ArrayList<AdClicked>();while(partition.hasNext()) {Tuple2<String, Long> record = partition.next();String[] splited = record._1.split("\t");AdClicked adClicked = new AdClicked();adClicked.setTimestamp(splited[0]);adClicked.setAdID(splited[1]);adClicked.setProvince(splited[2]);adClicked.setCity(splited[3]);adClicked.setClickedCount(record._2);AdClickedList.add(adClicked);}final List<AdClicked> inserting = new ArrayList<AdClicked>();final List<AdClicked> updating = new ArrayList<AdClicked>();JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();//表的字段timestamp、ip、userID、adID、province、city、clickedCountfor(final AdClicked clicked : AdClickedList) {jdbcWrapper.doQuery("SELECT clickedCount FROM adclickedcount WHERE"+ " timestamp = ? AND adID = ? AND province = ? AND city = ?",new Object[]{clicked.getTimestamp(), clicked.getAdID(),clicked.getProvince(), clicked.getCity()}, new ExecuteCallBack() {public void resultCallBack(ResultSet result) throws Exception {// TODO Auto-generated method stubif(result.next()) {long count = result.getLong(1);clicked.setClickedCount(count);updating.add(clicked);} else {inserting.add(clicked);clicked.setClickedCount(1L);} }});}//表的字段timestamp、ip、userID、adID、province、city、clickedCountList<Object[]> insertParametersList = new ArrayList<Object[]>();for(AdClicked insertRecord : inserting) {insertParametersList.add(new Object[] {insertRecord.getTimestamp(),insertRecord.getAdID(),insertRecord.getProvince(),insertRecord.getCity(),insertRecord.getClickedCount()});}jdbcWrapper.doBatch("INSERT INTO adclickedcount VALUES(?, ?, ?, ?, ?)", insertParametersList);//表的字段timestamp、ip、userID、adID、province、city、clickedCountList<Object[]> updateParametersList = new ArrayList<Object[]>();for(AdClicked updateRecord : updating) {updateParametersList.add(new Object[] {updateRecord.getClickedCount(),updateRecord.getTimestamp(),updateRecord.getAdID(),updateRecord.getProvince(),updateRecord.getCity()});}jdbcWrapper.doBatch("UPDATE adclickedcount SET clickedCount = ? WHERE"+ " timestamp =? AND adID = ? AND province = ? AND city = ?", updateParametersList);} });return null;} });/ 对广告点击进行TopN计算,计算出每天每个省份Top5排名的广告 因为我们直接对RDD进行操作,所以使用了transfomr算子;/updateStateByKeyDSteam.transform(new Function<JavaPairRDD<String,Long>, JavaRDD<Row>>() {public JavaRDD<Row> call(JavaPairRDD<String, Long> rdd) throws Exception {JavaRDD<Row> rowRDD = rdd.mapToPair(new PairFunction<Tuple2<String,Long>, String, Long>() {public Tuple2<String, Long> call(Tuple2<String, Long> t)throws Exception {// TODO Auto-generated method stubString[] splited=t._1.split("_");String timestamp = splited[0]; //YYYY-MM-DDString adID = splited[3];String province = splited[4];String clickedRecord = timestamp + "_" + adID + "_" + province;return new Tuple2<String, Long>(clickedRecord, t._2);} }).reduceByKey(new Function2<Long, Long, Long>() {public Long call(Long v1, Long v2) throws Exception {// TODO Auto-generated method stubreturn v1 + v2;} }).map(new Function<Tuple2<String,Long>, Row>() {public Row call(Tuple2<String, Long> v1) throws Exception {// TODO Auto-generated method stubString[] splited=v1._1.split("_");String timestamp = splited[0]; //YYYY-MM-DDString adID = splited[3];String province = splited[4];return RowFactory.create(timestamp, adID, province, v1._2);} });StructType structType = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("timestamp", DataTypes.StringType, true),DataTypes.createStructField("adID", DataTypes.StringType, true),DataTypes.createStructField("province", DataTypes.StringType, true),DataTypes.createStructField("clickedCount", DataTypes.LongType, true)));HiveContext hiveContext = new HiveContext(rdd.context());DataFrame df = hiveContext.createDataFrame(rowRDD, structType);df.registerTempTable("topNTableSource");DataFrame result = hiveContext.sql("SELECT timestamp, adID, province, clickedCount, FROM"+ " (SELECT timestamp, adID, province,clickedCount, "+ "ROW_NUMBER() OVER(PARTITION BY province ORDER BY clickeCount DESC) rank "+ "FROM topNTableSource) subquery "+ "WHERE rank <= 5");return result.toJavaRDD();} }).foreachRDD(new Function<JavaRDD<Row>, Void>() {public Void call(JavaRDD<Row> rdd) throws Exception {// TODO Auto-generated method stubrdd.foreachPartition(new VoidFunction<Iterator<Row>>() {public void call(Iterator<Row> t) throws Exception {// TODO Auto-generated method stubList<AdProvinceTopN> adProvinceTopN = new ArrayList<AdProvinceTopN>();while(t.hasNext()) {Row row = t.next();AdProvinceTopN item = new AdProvinceTopN();item.setTimestamp(row.getString(0));item.setAdID(row.getString(1));item.setProvince(row.getString(2));item.setClickedCount(row.getLong(3));adProvinceTopN.add(item);}// final List<AdProvinceTopN> inserting = new ArrayList<AdProvinceTopN>();// final List<AdProvinceTopN> updating = new ArrayList<AdProvinceTopN>();JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();Set<String> set = new HashSet<String>();for(AdProvinceTopN item: adProvinceTopN){set.add(item.getTimestamp() + "_" + item.getProvince());}//表的字段timestamp、adID、province、clickedCountArrayList<Object[]> deleteParametersList = new ArrayList<Object[]>();for(String deleteRecord : set) {String[] splited = deleteRecord.split("_");deleteParametersList.add(new Object[]{splited[0],splited[1]});}jdbcWrapper.doBatch("DELETE FROM adprovincetopn WHERE timestamp = ? AND province = ?", deleteParametersList);//表的字段timestamp、ip、userID、adID、province、city、clickedCountList<Object[]> insertParametersList = new ArrayList<Object[]>();for(AdProvinceTopN insertRecord : adProvinceTopN) {insertParametersList.add(new Object[] {insertRecord.getClickedCount(),insertRecord.getTimestamp(),insertRecord.getAdID(),insertRecord.getProvince()});}jdbcWrapper.doBatch("INSERT INTO adprovincetopn VALUES (?, ?, ?, ?)", insertParametersList);} });return null;} });/ 计算过去半个小时内广告点击的趋势 广告点击的基本数据格式:timestamp、ip、userID、adID、province、city/filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {public Tuple2<String, Long> call(Tuple2<String, String> t)throws Exception {String splited[] = t._2.split("\t");String adID = splited[3];String time = splited[0]; //Todo:后续需要重构代码实现时间戳和分钟的转换提取。此处需要提取出该广告的点击分钟单位return new Tuple2<String, Long>(time + "_" + adID, 1L);} }).reduceByKeyAndWindow(new Function2<Long, Long, Long>() {public Long call(Long v1, Long v2) throws Exception {// TODO Auto-generated method stubreturn v1 + v2;} }, new Function2<Long, Long, Long>() {public Long call(Long v1, Long v2) throws Exception {// TODO Auto-generated method stubreturn v1 - v2;} }, Durations.minutes(30), Durations.milliseconds(5)).foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {public Void call(JavaPairRDD<String, Long> rdd) throws Exception {// TODO Auto-generated method stubrdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {public void call(Iterator<Tuple2<String, Long>> partition)throws Exception {List<AdTrendStat> adTrend = new ArrayList<AdTrendStat>();// TODO Auto-generated method stubwhile(partition.hasNext()) {Tuple2<String, Long> record = partition.next();String[] splited = record._1.split("_");String time = splited[0];String adID = splited[1];Long clickedCount = record._2;/ 在插入数据到数据库的时候具体需要哪些字段?time、adID、clickedCount; 而我们通过J2EE技术进行趋势绘图的时候肯定是需要年、月、日、时、分这个维度的,所以我们在这里需要 年月日、小时、分钟这些时间维度;/AdTrendStat adTrendStat = new AdTrendStat();adTrendStat.setAdID(adID);adTrendStat.setClickedCount(clickedCount);adTrendStat.set_date(time); //Todo:获取年月日adTrendStat.set_hour(time); //Todo:获取小时adTrendStat.set_minute(time);//Todo:获取分钟adTrend.add(adTrendStat);}final List<AdTrendStat> inserting = new ArrayList<AdTrendStat>();final List<AdTrendStat> updating = new ArrayList<AdTrendStat>();JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();//表的字段timestamp、ip、userID、adID、province、city、clickedCountfor(final AdTrendStat trend : adTrend) {final AdTrendCountHistory adTrendhistory = new AdTrendCountHistory();jdbcWrapper.doQuery("SELECT clickedCount FROM adclickedtrend WHERE"+ " date =? AND hour = ? AND minute = ? AND AdID = ?",new Object[]{trend.get_date(), trend.get_hour(), trend.get_minute(),trend.getAdID()}, new ExecuteCallBack() {public void resultCallBack(ResultSet result) throws Exception {// TODO Auto-generated method stubif(result.next()) {long count = result.getLong(1);adTrendhistory.setClickedCountHistoryLong(count);updating.add(trend);} else { inserting.add(trend);} }});}//表的字段date、hour、minute、adID、clickedCountList<Object[]> insertParametersList = new ArrayList<Object[]>();for(AdTrendStat insertRecord : inserting) {insertParametersList.add(new Object[] {insertRecord.get_date(),insertRecord.get_hour(),insertRecord.get_minute(),insertRecord.getAdID(),insertRecord.getClickedCount()});}jdbcWrapper.doBatch("INSERT INTO adclickedtrend VALUES(?, ?, ?, ?, ?)", insertParametersList);//表的字段date、hour、minute、adID、clickedCountList<Object[]> updateParametersList = new ArrayList<Object[]>();for(AdTrendStat updateRecord : updating) {updateParametersList.add(new Object[] {updateRecord.getClickedCount(),updateRecord.get_date(),updateRecord.get_hour(),updateRecord.get_minute(),updateRecord.getAdID()});}jdbcWrapper.doBatch("UPDATE adclickedtrend SET clickedCount = ? WHERE"+ " date =? AND hour = ? AND minute = ? AND AdID = ?", updateParametersList);} });return null;} });;/ Spark Streaming 执行引擎也就是Driver开始运行,Driver启动的时候是位于一条新的线程中的,当然其内部有消息循环体,用于 接收应用程序本身或者Executor中的消息,/javassc.start();javassc.awaitTermination();javassc.close();}private static JavaStreamingContext createContext(String checkpointDirectory, SparkConf conf) {// If you do not see this printed, that means the StreamingContext has been loaded// from the new checkpointSystem.out.println("Creating new context");// Create the context with a 5 second batch sizeJavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(10));ssc.checkpoint(checkpointDirectory);return ssc;} }class JDBCWrapper {private static JDBCWrapper jdbcInstance = null;private static LinkedBlockingQueue<Connection> dbConnectionPool = new LinkedBlockingQueue<Connection>();static {try {Class.forName("com.mysql.jdbc.Driver");} catch (ClassNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();} }public static JDBCWrapper getJDBCInstance() {if(jdbcInstance == null) {synchronized (JDBCWrapper.class) {if(jdbcInstance == null) {jdbcInstance = new JDBCWrapper();} }}return jdbcInstance; }private JDBCWrapper() {for(int i = 0; i < 10; i++){try {Connection conn = DriverManager.getConnection("jdbc:mysql://Master:3306/sparkstreaming","root", "root");dbConnectionPool.put(conn);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();} } }public synchronized Connection getConnection() {while(0 == dbConnectionPool.size()){try {Thread.sleep(20);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} }return dbConnectionPool.poll();}public int[] doBatch(String sqlText, List<Object[]> paramsList){Connection conn = getConnection();PreparedStatement preparedStatement = null;int[] result = null;try {conn.setAutoCommit(false);preparedStatement = conn.prepareStatement(sqlText);for(Object[] parameters: paramsList) {for(int i = 0; i < parameters.length; i++){preparedStatement.setObject(i + 1, parameters[i]);} preparedStatement.addBatch();}result = preparedStatement.executeBatch();conn.commit();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();} finally {if(preparedStatement != null) {try {preparedStatement.close();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();} }if(conn != null) {try {dbConnectionPool.put(conn);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} }}return result; }public void doQuery(String sqlText, Object[] paramsList, ExecuteCallBack callback){Connection conn = getConnection();PreparedStatement preparedStatement = null;ResultSet result = null;try {preparedStatement = conn.prepareStatement(sqlText);for(int i = 0; i < paramsList.length; i++){preparedStatement.setObject(i + 1, paramsList[i]);} result = preparedStatement.executeQuery();try {callback.resultCallBack(result);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();} } catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();} finally {if(preparedStatement != null) {try {preparedStatement.close();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();} }if(conn != null) {try {dbConnectionPool.put(conn);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} }} }}interface ExecuteCallBack {void resultCallBack(ResultSet result) throws Exception;}class UserAdClicked {private String timestamp;private String ip;private String userID;private String adID;private String province;private String city;private Long clickedCount;public String getTimestamp() {return timestamp;}public void setTimestamp(String timestamp) {this.timestamp = timestamp;}public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getAdID() {return adID;}public void setAdID(String adID) {this.adID = adID;}public String getProvince() {return province;}public void setProvince(String province) {this.province = province;}public String getCity() {return city;}public void setCity(String city) {this.city = city;}public Long getClickedCount() {return clickedCount;}public void setClickedCount(Long clickedCount) {this.clickedCount = clickedCount;} }class AdClicked {private String timestamp;private String adID;private String province;private String city;private Long clickedCount;public String getTimestamp() {return timestamp;}public void setTimestamp(String timestamp) {this.timestamp = timestamp;}public String getAdID() {return adID;}public void setAdID(String adID) {this.adID = adID;}public String getProvince() {return province;}public void setProvince(String province) {this.province = province;}public String getCity() {return city;}public void setCity(String city) {this.city = city;}public Long getClickedCount() {return clickedCount;}public void setClickedCount(Long clickedCount) {this.clickedCount = clickedCount;} }class AdProvinceTopN {private String timestamp;private String adID;private String province;private Long clickedCount;public String getTimestamp() {return timestamp;}public void setTimestamp(String timestamp) {this.timestamp = timestamp;}public String getAdID() {return adID;}public void setAdID(String adID) {this.adID = adID;}public String getProvince() {return province;}public void setProvince(String province) {this.province = province;}public Long getClickedCount() {return clickedCount;}public void setClickedCount(Long clickedCount) {this.clickedCount = clickedCount;} }class AdTrendStat {private String _date;private String _hour;private String _minute;private String adID;private Long clickedCount;public String get_date() {return _date;}public void set_date(String _date) {this._date = _date;}public String get_hour() {return _hour;}public void set_hour(String _hour) {this._hour = _hour;}public String get_minute() {return _minute;}public void set_minute(String _minute) {this._minute = _minute;}public String getAdID() {return adID;}public void setAdID(String adID) {this.adID = adID;}public Long getClickedCount() {return clickedCount;}public void setClickedCount(Long clickedCount) {this.clickedCount = clickedCount;} }class AdTrendCountHistory{private Long clickedCountHistoryLong;public Long getClickedCountHistoryLong() {return clickedCountHistoryLong;}public void setClickedCountHistoryLong(Long clickedCountHistoryLong) {this.clickedCountHistoryLong = clickedCountHistoryLong;} } 本篇文章为转载内容。原文链接:https://blog.csdn.net/tom_8899_li/article/details/71194434。 该文由互联网用户投稿提供,文中观点代表作者本人意见,并不代表本站的立场。 作为信息平台,本站仅提供文章转载服务,并不拥有其所有权,也不对文章内容的真实性、准确性和合法性承担责任。 如发现本文存在侵权、违法、违规或事实不符的情况,请及时联系我们,我们将第一时间进行核实并删除相应内容。
2023-02-14 19:16:35
297
转载
MySQL
MySQL是一款普及的关系型数据库管理系统,它支持多种连接方式。常规连接和SSH连接是其中两种常见方式。下面我们将介绍它们之间的区别。 常规连接: // 连接MySQL服务器 $host = 'localhost'; // 主机名 $user = 'root'; // 账号 $password = '123456'; // 口令 $database = 'test'; // 数据库名称 $conn = mysqli_connect($host, $user, $password, $database); // 检测连接是否成功 if (!$conn) { die('连接不成功: ' . mysqli_connect_error()); } // 查询数据 $sql = 'SELECT FROM user'; $result = mysqli_query($conn, $sql); // 处理查询结果 if (mysqli_num_rows($result) >0) { while ($row = mysqli_fetch_assoc($result)) { echo '账号: ' . $row['username'] . ', 口令: ' . $row['password'] . ' '; } } else { echo '没有结果'; } // 关闭连接 mysqli_close($conn); 常规连接的代码比较简单,用mysqli_connect()函数连接MySQL服务器,然后用mysqli_query()函数执行查询,最后用mysqli_fetch_assoc()函数处理查询结果。这种连接方式适用于在本地开发和测试。 SSH连接: // 连接MySQL服务器 $host = 'localhost'; // 主机名 $user = 'root'; // 账号 $password = '123456'; // 口令 $database = 'test'; // 数据库名称 // SSH设置 $ssh_host = 'ssh.example.com'; // SSH主机名 $ssh_user = 'sshuser'; // SSH账号 $ssh_password = 'sshpassword'; // SSH口令 $ssh_port = 22; // SSH端口 // SSH到MySQL服务器 $connection = ssh2_connect($ssh_host, $ssh_port); if (ssh2_auth_password($connection, $ssh_user, $ssh_password)) { // SSH认证成功 $tunnel = ssh2_tunnel($connection, $host, 3306); // 连接MySQL服务器 $conn = mysqli_connect('127.0.0.1', $user, $password, $database, '3306', $tunnel); // 检测连接是否成功 if (!$conn) { die('连接不成功: ' . mysqli_connect_error()); } // 查询数据 $sql = 'SELECT FROM user'; $result = mysqli_query($conn, $sql); // 处理查询结果 if (mysqli_num_rows($result) >0) { while ($row = mysqli_fetch_assoc($result)) { echo '账号: ' . $row['username'] . ', 口令: ' . $row['password'] . ' '; } } else { echo '没有结果'; } // 关闭连接 mysqli_close($conn); } else { // SSH认证不成功 die('SSH认证不成功'); } SSH连接的代码相对复杂,需要用ssh2_connect()函数连接SSH服务器,用ssh2_auth_password()函数进行SSH认证,然后用ssh2_tunnel()函数创建隧道,最后用mysqli_connect()函数连接MySQL服务器和数据库。SSH连接的好处是可以通过SSH隧道连接到远程的MySQL服务器,提升了数据传输的安全性。
2023-06-22 12:09:56
134
码农
MySQL
MySQL , MySQL是一种广泛使用的关系型数据库管理系统(RDBMS),由Oracle公司开发并维护。它以SQL语言为基础,提供数据的存储、检索、更新和管理等功能。在本文中,用户需要判断MySQL数据库是否存在,这是进行数据库操作的基础步骤。 命令行工具 , 命令行工具是指通过文本界面而非图形用户界面(GUI)与操作系统或应用程序交互的方式。在MySQL环境下,命令行工具通常指的是MySQL客户端程序,用户可以通过输入特定命令来执行各种数据库操作,如连接服务器、查询数据库列表等。 phpMyAdmin , phpMyAdmin是一个基于PHP编写的开源Web应用程序,专门用于管理和操作MySQL/MariaDB数据库。通过Web浏览器界面,用户可以直观地创建、修改、删除数据库及表结构,以及执行SQL查询、导入/导出数据等操作。在本文中,用户借助phpMyAdmin查看MySQL数据库列表以确定某个数据库是否存在。 mysqli_select_db() , mysqli_select_db()是PHP中MySQL Improved (mysqli)扩展提供的一个函数,用于在已建立的数据库连接中选择(切换至)指定的数据库。如果该数据库存在并且成功切换,函数返回TRUE;否则返回FALSE。在本文中,当命令行工具和phpMyAdmin无法验证数据库是否存在时,开发者可以使用这个函数在PHP代码中进一步验证数据库的存在性。
2023-01-14 14:51:54
105
代码侠
MySQL
将数据传输到MySQL数据库中是数据处理的重要步骤。为方便说明,假设我们要将一个名为“test”的数据表创建到指定MySQL服务器的数据库中。 第一步是连接到MySQL服务器。使用以下PHP代码进行连接: $db_host = "localhost"; // MySQL服务器地址 $db_user = "root"; // MySQL用户名 $db_pass = "password"; // MySQL用户密码 $db_name = "database_name"; // 数据库名 $conn = mysqli_connect($db_host, $db_user, $db_pass, $db_name); if (!$conn) { die("连接错误:" . mysqli_connect_error()); } 连接成功后,我们可以将数据传输到MySQL数据库中。将以下PHP代码放到您的脚本中: $sql = "CREATE TABLE test ( id INT(6) UNSIGNED AUTO_INCREMENT PRIMARY KEY, name VARCHAR(30) NOT NULL, email VARCHAR(50) NOT NULL, reg_date TIMESTAMP )"; if (mysqli_query($conn, $sql)) { echo "数据表test创建成功"; } else { echo "创建数据表错误: " . mysqli_error($conn); } 以上代码将在您的MySQL数据库中创建名为test的数据表。该表包含id、name、email和reg_date列。id列将自动递增,并将作为主键。name和email列不能为NULL,而reg_date列将保存创建行的时间戳。 上传数据到MySQL数据库中可能需要一些额外的数据处理。您可以从CSV文件、文本文件、XML文件、JSON数据或通过表格收集的数据中读取数据,然后将其转换为MySQL可以处理的常规数据格式。使用以下PHP代码将数据上传到MySQL数据库中: $myfile = fopen("data.txt", "r") or die("不能打开文件!"); while (!feof($myfile)) { $line = fgets($myfile); $line_arr = explode(",", $line); $name = $line_arr[0]; $email = $line_arr[1]; $sql = "INSERT INTO test (name, email) VALUES ('$name', '$email')"; mysqli_query($conn, $sql); } fclose($myfile); echo "上传数据到MySQL数据库成功"; 以上代码将从文本文件中获取数据,并将其上传到MySQL数据库的test数据表中。请注意,我们将数据数组中的第一和第二个元素映射到MySQL表test中的name和email列。 当您上传或更新数据时,请记得在您的PHP脚本中使用适当的错误处理和安全措施,以确保数据库安全。
2024-01-19 14:50:17
333
数据库专家
PostgreSQL
PostgreSQL:如何创建一个可以显示值出来的索引? 引言 PostgreSQL是一款强大的开源关系型数据库管理系统,支持多种存储引擎和索引类型。这篇文儿呢,主要是手把手教你咋在PostgreSQL这个数据库里头,捣鼓出一个能够秀出具体数值的索引,让你的数据查询嗖嗖快。 创建索引的基本步骤 在PostgreSQL中,我们可以使用CREATE INDEX语句来创建一个新的索引。以下是一些基本步骤: 步骤一:选择要创建索引的表 首先,我们需要选择要创建索引的表。例如,如果我们有一个名为employees的表,我们可以在其中创建索引: sql CREATE TABLE employees ( id serial primary key, name varchar(50), department varchar(50) ); 步骤二:选择要创建索引的列 接下来,我们需要选择要创建索引的列。例如,如果我们想要根据name列创建一个索引,我们可以这样做: sql CREATE INDEX idx_employees_name ON employees (name); 在这个例子中,idx_employees_name是我们给索引起的名字,ON employees (name)表示我们在employees表的name列上创建了一个新的索引。 步骤三:创建索引 最后,我们可以通过执行上述SQL语句来创建索引。要是没啥意外,PostgreSQL会亲口告诉我们一个好消息,那就是索引已经妥妥地创建成功啦! sql CREATE INDEX idx_employees_name ON employees (name); 如何查看已创建的索引? 如果你想知道哪些索引已经被创建在你的表上,你可以使用pg_indexes系统视图。这个视图可厉害了,它囊括了所有的索引信息,从索引的名字,到它所对应绑定的表,再到索引的各种类型,啥都一清二楚,明明白白。 sql SELECT FROM pg_indexes WHERE tablename = 'employees'; 这将会返回一个结果集,其中包含了employees表上的所有索引的信息。 创建可以显示值的索引 在PostgreSQL中,创建一个可以显示值的索引很简单。我们只需要在创建索引的时候指定我们想要使用的索引类型即可。目前,PostgreSQL支持多种索引类型,包括B-tree、哈希、GiST、SP-GiST和GIN等。不同的索引类型就像不同类型的工具,各有各的适用场合。所以,你得根据自己的实际需求,像挑选合适的工具一样,去选择最适合你的索引类型。别忘了,对症下药才能发挥最大效用! 以下是一个创建B-tree索引的例子: sql CREATE INDEX idx_employees_name_btree ON employees (name); 在这个例子中,idx_employees_name_btree是我们给索引起的名字,ON employees (name)表示我们在employees表的name列上创建了一个新的B-tree索引。如果你想创建不同类型的索引,那就简单啦,只需要把“btree”这个词儿换成你心水的索引类型就大功告成啦!就像是换衣服一样,根据你的需求选择不同的“款式”就行。 总结 创建一个可以显示值的索引并不难。其实,你只需要用一句“CREATE INDEX”命令,就能轻松搞定创建索引的事儿。具体来说,就是在这句命令里头,告诉系统你要在哪个表上建索引、打算对哪一列建立索引,还有你希望用哪种类型的索引,一切就OK啦!就像是在跟数据库说:“嗨,我在某某表的某某列上,想要创建一个这样那样的索引!”另外,你还可以使用pg_indexes系统视图来查看已创建的所有索引。希望这篇文章能对你有所帮助!
2023-11-30 10:13:56
261
半夏微凉_t
DorisDB
...DB是一个强大的开源数据库系统,它以其高效的数据处理能力和可扩展性受到了许多开发者的喜爱。然而,随着数据量的增长,我们可能会遇到一些性能问题。本文将详细介绍如何在DorisDB中进行SQL语句的性能调优。 二、优化SQL语句的基本原则 优化SQL语句的原则主要有三个:尽可能减少数据读取,提高查询效率,降低磁盘I/O操作。 三、如何减少数据读取? 1. 索引优化 索引是加速查询的重要工具。在DorisDB中,我们可以使用CREATE INDEX语句创建索引。例如: sql CREATE INDEX idx_name ON table_name(name); 这个语句会在table_name表上根据name字段创建一个索引。 2. 避免全表扫描 全表扫描是最耗时的操作之一。因此,我们应该尽可能避免全表扫描。例如,如果我们需要查找age大于18的所有用户,我们可以使用如下语句: sql SELECT FROM user WHERE age > 18; 如果age字段没有索引,那么查询将会进行全表扫描。为了提高查询效率,我们应该为age字段创建索引。 四、如何提高查询效率? 1. 分区设计 分区设计可以显著提高查询效率。在DorisDB这个数据库里,我们可以灵活运用PARTITION BY命令,就像给表分门别类一样进行分区操作,让数据管理更加井井有条。例如: sql CREATE TABLE table_name ( id INT, name STRING, ... ) PARTITIONED BY (id); 这个语句会根据id字段对table_name表进行分区。 2. 查询优化器 DorisDB的查询优化器可以根据查询语句自动选择最优的执行计划。但是,有时候我们需要手动调整优化器的行为。例如,我们可以使用EXPLAIN语句查看优化器选择的执行计划: sql EXPLAIN SELECT FROM table_name WHERE age > 18; 如果我们发现优化器选择的执行计划不是最优的,我们可以使用FORCE_INDEX语句强制优化器使用特定的索引: sql SELECT FROM table_name FORCE INDEX(idx_age) WHERE age > 18; 五、如何降低磁盘I/O操作? 1. 使用流式计算 流式计算是一种高效的处理大量数据的方式。在DorisDB中,我们可以使用INSERT INTO SELECT语句进行流式计算: sql INSERT INTO new_table SELECT FROM old_table WHERE age > 18; 这个语句会从old_table表中选择age大于18的数据,并插入到new_table表中。 2. 使用Bloom Filter Bloom Filter是一种空间换时间的数据结构,它可以快速判断一个元素是否存在于集合中。在DorisDB这个数据库里,我们有个小妙招,就是用Bloom Filter这家伙来帮咱们提前把一些肯定不存在的结果剔除掉。这样一来,就能有效减少磁盘I/O操作,让查询速度嗖嗖的提升。 总结,通过以上的方法,我们可以有效地提高DorisDB的查询性能。当然啦,这只是入门级别的小窍门,具体的优化方案咱们还得根据实际情况灵活变通,不断调整优化~希望这篇文章能够帮助你更好地理解和使用DorisDB。
2023-05-04 20:31:52
524
雪域高原-t
.net
...中,我们经常会使用到数据库操作,为了提升代码复用性和降低耦合度,通常会封装一个通用的数据访问层,如SqlHelper类。不过在实际动手操作的时候,咱们免不了会撞上一些突如其来的小插曲,特别是当我们要把数据塞进去的时候。嘿,伙计们,这篇文稿将会拽着你们的手,一起蹦跶进这个问题的奇妙世界。咱会借助那些实实在在的实例代码,再配上超级详细的解说,像剥洋葱那样一层层揭开这个谜团的神秘面纱,让一切变得清清楚楚、明明白白! 2. SqlHelper类的封装与基本使用 首先,让我们来看看如何在.NET框架下封装一个基础的SqlHelper类(这里以C为例): csharp public class SqlHelper { private static string connectionString = "YourConnectionString"; public static int ExecuteNonQuery(string sql, params SqlParameter[] parameters) { using (SqlConnection connection = new SqlConnection(connectionString)) { SqlCommand command = new SqlCommand(sql, connection); command.Parameters.AddRange(parameters); connection.Open(); return command.ExecuteNonQuery(); } } } 这个类提供了一个ExecuteNonQuery方法,用于执行非查询型SQL语句,比如INSERT、UPDATE或DELETE。现在假设我们要插入一条用户记录: csharp SqlParameter idParam = new SqlParameter("@Id", SqlDbType.Int) { Value = 1 }; SqlParameter nameParam = new SqlParameter("@Name", SqlDbType.NVarChar, 50) { Value = "John Doe" }; int rowsAffected = SqlHelper.ExecuteNonQuery( "INSERT INTO Users(Id, Name) VALUES (@Id, @Name)", idParam, nameParam); 3. 插入数据时可能遇到的问题及解决方案 - 问题一:参数化SQL错误 在调用SqlHelper.ExecuteNonQuery方法执行插入操作时,如果SQL语句编写错误或者参数未正确绑定,就可能导致插入失败。比如说,假如你在表结构里把字段名写错了,或者参数名跟SQL语句里的占位符对不上号,程序就跟你闹脾气,罢工不干活了,没法正常运行。 csharp // 错误示例:字段名写错 SqlParameter idParam = ...; SqlParameter nameParam = ...; int rowsAffected = SqlHelper.ExecuteNonQuery( "INSERT INTO Users(ID, Nam) VALUES (@Id, @Name)", // 'Nam' 应为 'Name' idParam, nameParam); 解决方案是仔细检查并修正SQL语句以及参数绑定。 - 问题二:主键冲突 如果尝试插入已存在的主键值,数据库会抛出异常。例如,我们的用户表中有自增主键Id,但仍尝试插入一个已存在的Id值。 csharp SqlParameter idParam = new SqlParameter("@Id", SqlDbType.Int) { Value = 1 }; // 假设Id=1已存在 ... int rowsAffected = SqlHelper.ExecuteNonQuery(...); // 这里会抛出主键冲突异常 对于此问题,我们需要在设计时考虑是否允许插入已存在的主键,如果不允许,则需要在代码层面做校验,或者利用数据库自身的约束来处理。 4. 深入思考与讨论 在封装SqlHelper类的过程中,我们不仅要注意其功能实现,更要关注异常处理和性能优化。比如,当我们进行插入数据这个操作时,可以考虑引入事务机制,这样就能保证数据稳稳当当地保持一致性。再者,对于那些随时可能蹦跶出来的各种异常情况,咱们得及时把它们逮住,并且提供一些实实在在、能让人一看就明白的错误提示,这样开发者就能像雷达一样迅速找准问题所在了。此外,我们还可以扩展此类,加入预编译SQL命令等功能,进一步提高数据操作效率。 总结来说,封装SqlHelper类确实极大地便利了我们的数据库操作,但在实际应用过程中,尤其是插入数据等关键操作时,我们必须对可能遇到的问题保持警惕,并采取有效的预防和解决措施。通过不断的实践和探索,我们可以让封装的SqlHelper类更加健壮和完善,更好地服务于项目开发。
2023-04-19 11:32:32
549
梦幻星空_
Linux
...巨头。在二零一六年,SQL Server这位数据库界的重量级选手,突然间跳出舒适区,登上Linux的热场,给程序员和运维人员带来了意想不到的创新惊喜。嘿,今天咱们来聊聊怎么在那个经典的CentOS 7系统上给SQL Server 2016找个家,一步步操作起来,超简单! 1.2 SQL Server on Linux的背景 - 在2016年12月,微软宣布将SQL Server移植到Linux,这一举措标志着数据库技术的开放和包容性增强。 - 对于那些依赖SQL Server的企业来说,能在Linux上运行意味着更大的灵活性和成本节省。 第二章:系统需求与兼容性 2.1 硬件与软件环境 - CentOS 7.5要求稳定的硬件资源,包括足够的内存和CPU性能。 - 至少需要64位的Linux内核版本,因为SQL Server 2016是64位的。 bash 检查系统版本和CPU架构 uname -a - 验证你的CentOS版本是否满足要求,确保支持的内核模块已安装。 2.2 兼容性概述 - SQL Server 2016 for Linux支持多种架构,包括x86和x86_64,但不支持ARM架构。 - 在决定安装前,确认你的硬件是兼容的,可以通过dpkg --print-architecture或cat /proc/cpuinfo检查。 第三章:安装准备 3.1 添加官方仓库 - 在CentOS 7中,我们需要添加Microsoft的Yum源才能获取SQL Server的安装包。 bash wget -qO- https://packages.microsoft.com/keys/microsoft.asc | sudo apt-key add - echo "deb [arch=amd64,signed-by=/usr/share/keyrings/microsoft-archive-keyring.gpg] https://packages.microsoft.com/repos/mssql-release/centos7_amd64 yum stable" | sudo tee /etc/yum.repos.d/mssql-release.repo - 更新yum仓库以便安装最新版本。 bash sudo yum update -y 3.2 选择安装类型 - SQL Server 2016提供了两种安装选项:Evaluation(免费试用版,适合开发和测试)和Community(商业版,需要订阅)。 bash sudo yum install msopengauss msopengauss-client msopengauss-devel -y - 或者,选择Community版,可能需要替换msopengauss为mssql-server。 第四章:安装与配置 4.1 安装SQL Server - 使用yum安装SQL Server,记得替换版本号和实例名称。 bash sudo yum install mssql-server-2016 -y sudo systemctl start msopengauss - 如果是社区版,可能会看到类似mssql-server的包名。 4.2 配置和初始化 - 使用mssql-conf工具进行基本配置,如设置监听端口和密码。 bash sudo opt/mssql/bin/mssql-conf setup - 选择“Custom Configuration”,根据需要自定义安装。 4.3 数据库实例管理 - 创建数据库实例,例如: bash sudo opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P 'your_password' -Q "CREATE DATABASE YourDatabaseName" - 更改默认的sa用户密码: bash sudo opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P 'old_password' -Q "ALTER LOGIN sa WITH PASSWORD = 'new_password'" 第五章:连接与验证 5.1 命令行工具 - 使用sqlcmd工具连接到新安装的数据库。 bash sqlcmd -S localhost -U sa -P 'your_password' - 验证连接成功后,可以执行查询操作。 5.2图形化工具 - 可以选择安装SQL Server Management Studio(SSMS)的Linux版本,或者使用第三方工具如ssms-linux,来进行更直观的管理。 结论 6.1 总结与展望 - CentOS 7确实可以安装SQL Server 2016,尽管它已经不再是最新版本,但对于那些还在使用或需要兼容旧版本的用户来说,这是一个可行的选择。 - 未来,随着技术的迭代,SQL Server on Linux的体验会越来越完善,跨平台的数据库管理将更加无缝。 在这个快速发展的技术时代,适应变化并充分利用新的工具是关键。真心希望这篇指南能像老朋友一样,手把手教你轻松搞定在Linux大本营里安装和打理SQL Server 2016的那些事儿,让你畅游在数据库的海洋里无阻无碍。嘿,想找最潮的解决招数对吧?记得翻翻官方手册,那里有新鲜出炉的支援和超实用的建议!
2024-04-11 11:07:55
96
醉卧沙场_
Superset
...Superset进行数据可视化分析的过程中,我们时常会遇到需要根据自身需求调整配置文件的情况。然而,有时候会出现这么个情况,明明咱已经捣鼓了那个superset_config.py文件,也重新启动了服务,结果却发现做的改动压根没起作用。哎呀,这种时候真是让人头疼又满心狐疑,你说气不气人?这篇文章呢,咱会手把手、一步步带着大家,用实例代码演示和深度讨论的方式,把这个问题掰开揉碎了讲明白,而且还会给大家献上实实在在的解决妙招! 2. 配置文件修改概述 Superset的自定义配置通常保存在superset_config.py中,这是一个用户可以根据自身需求扩展或覆盖默认配置的地方。例如,我们要修改数据库连接信息: python from superset import conf 修改默认数据库连接 conf.set('SQLALCHEMY_DATABASE_URI', 'postgresql://username:password@localhost/superset_db') 3. 问题重现与常见原因分析 假设你已按照上述方式修改了数据库连接字符串,但重启服务后发现仍连接到旧的数据库。此时,可能的原因有以下几点: - (1)配置文件路径不正确:Superset启动时并没有加载你修改的配置文件。 - (2)环境变量未更新:如果Superset是通过环境变量引用配置文件,那么更改环境变量的值后可能未被系统识别。 - (3)配置未生效:某些配置项在服务启动后不能动态改变,需要完全重启服务才能生效。 - (4)缓存问题:Superset存在部分配置缓存,未及时清除导致新配置未生效。 4. 解决方案与操作步骤 (1) 确认配置文件路径及加载情况 确保Superset启动命令正确指向你修改的配置文件。例如,如果你在终端执行如下命令启动Superset: bash export PYTHONPATH=/path/to/your/superset/ venv/bin/python superset run -p 8088 --with-threads --reload --debugger 请确认这里的PYTHONPATH设置是否正确。若Superset通过环境变量读取配置,也需检查相应环境变量的设置。 (2) 清理并完全重启服务 在完成配置文件修改后,不仅要停止当前运行的Superset服务,还要确保所有相关的子进程也被清理干净。例如,在Unix-like系统中,可以使用pkill -f superset命令终止所有相关进程,然后重新启动服务。 (3) 检查和处理配置缓存 对于某些特定的配置,Superset可能会在内存中缓存它们。嘿,遇到这种情况的时候,你可以试试清理一下Superset的缓存,或者重启一下相关的服务部件,就像是数据库连接池那些家伙,让它们重新焕发活力。 (4) 验证配置加载 在Superset日志中查找有关配置加载的信息,确认新配置是否成功加载。例如: bash INFO:root:Loaded your LOCAL configuration at [/path/to/your/superset/superset_config.py] 5. 思考与探讨 当我们遇到类似“配置修改后未生效”的问题时,作为开发者,我们需要遵循一定的排查逻辑:首先确认配置文件的加载路径和内容;其次,理解配置生效机制,包括是否支持热加载,是否存在缓存等问题;最后,通过查看日志等方式验证配置的实际应用情况。 在这个过程中,不仅锻炼了我们的问题定位能力,同时也加深了对Superset工作原理的理解。而面对这种看似让人挠头的问题,只要我们沉住气,像侦探破案那样一步步抽丝剥茧,就一定能找到问题的核心秘密,最后妥妥地把事情搞定,实现我们想要的结果。 6. 结语 调试和优化Superset配置是一个持续的过程,每个环节都充满了挑战与乐趣。记住了啊,每当你遇到困惑或者开始一场探索之旅,其实都是在朝着更牛、更个性化的数据分析道路迈出关键的一大步呢!希望本文能帮你顺利解决Superset配置修改后重启服务未生效的问题,助你在数据海洋中畅游无阻。
2024-01-24 16:27:57
240
冬日暖阳
Greenplum
...reenplum中的数据类型和精度:一次深入实践之旅 1. 引言 在大数据领域,Greenplum作为一款开源且高度可扩展的MPP(大规模并行处理)数据库,以其卓越的大规模数据分析能力深受广大用户的青睐。在实际操作时,我们可能会遇到需要对表格里的数据类型或者精度进行微调的情况。这背后的原因五花八门,可能是为了更有效地利用存储空间,让查询速度嗖嗖提升;也可能是为了更好地适应业务发展,满足那些新冒出来的需求点。这篇内容,咱们会手把手地通过一些实实在在的代码实例,带你逐个步骤掌握如何在Greenplum里搞定这个操作。同时,咱们还会边走边聊,一起探讨在这个过程中可能会踩到的坑以及相应的填坑大法。 2. 理解Greenplum的数据类型与精度 在Greenplum中,每列都有特定的数据类型,如整数(integer)、浮点数(real)、字符串(varchar)等,而精度则是针对数值型数据类型的特性,如numeric(10,2)表示最大整数位数为10,小数位数为2。理解这些基础概念是进行调整的前提。 sql -- 创建一个包含不同数据类型的表 CREATE TABLE test_data_types ( id INT, name VARCHAR(50), salary NUMERIC(10,2) ); 3. 调整Greenplum中的数据类型 场景一:改变数据类型 例如,假设我们的salary字段原先是INTEGER类型,现在希望将其更改为NUMERIC以支持小数点后的精度。 sql -- 首先,我们需要确保所有数据都能成功转换到新类型 ALTER TABLE test_data_types ALTER COLUMN salary TYPE NUMERIC; -- 或者,如果需要同时指定精度 ALTER TABLE test_data_types ALTER COLUMN salary TYPE NUMERIC(10,2); 注意,修改数据类型时必须保证现有数据能成功转换到新的类型,否则操作会失败。在执行上述命令前,最好先运行一些验证查询来检查数据是否兼容。 场景二:增加或减少数值类型的精度 若要修改salary字段的小数位数,可以如下操作: sql -- 增加salary字段的小数位数 ALTER TABLE test_data_types ALTER COLUMN salary TYPE NUMERIC(15,4); -- 减少salary字段的小数位数,系统会自动四舍五入 ALTER TABLE test_data_types ALTER COLUMN salary TYPE NUMERIC(10,1); 4. 考虑的因素与挑战 - 数据完整性与一致性:在调整数据类型或精度时,务必谨慎评估变更可能带来的影响,比如精度降低可能导致的数据丢失。 - 性能开销:某些数据类型之间的转换可能带来额外的CPU计算资源消耗,尤其是在大表上操作时。 - 索引重建:更改数据类型后,原有的索引可能不再适用,需要重新创建。 - 事务与并发控制:对于大型生产环境,需规划合适的维护窗口期,以避免在数据类型转换期间影响其他业务流程。 5. 结语 调整Greenplum中的数据类型和精度是一个涉及数据完整性和性能优化的关键步骤。在整个这个过程中,我们得像个侦探一样,深入地摸透业务需求,把数据验证做得像查户口似的,仔仔细细,一个都不能放过。同时,咱们还要像艺术家设计蓝图那样,精心策划每一次的变更方案。为啥呢?就是为了在让系统跑得飞快的同时,保证咱的数据既整齐划一又滴水不漏。希望这篇东西里提到的例子和讨论能实实在在帮到你,让你在用Greenplum处理数据的时候,感觉就像个武林高手,轻松应对各种挑战,游刃有余,毫不费力。
2024-02-18 11:35:29
396
彩虹之上
MyBatis
...MyBatis框架中SQL语句在XML中的编写错误及其修正方法后,我们可以进一步关注数据库操作安全与性能优化的最新实践和理论研究。近期,随着Spring Boot 2.5对MyBatis整合支持的持续完善,开发者们在实际项目中如何更高效、安全地运用MyBatis进行复杂查询及动态SQL构建成为热门话题。 例如,InfoQ的一篇文章“深入解析MyBatis动态SQL的最佳实践与潜在风险”,不仅详细阐述了如何避免文中提及的基础语法错误与动态SQL拼接问题,还介绍了最新的动态元素如, 等在处理批量更新或复杂条件查询时的应用技巧,以及如何通过结合注解方式进行SQL映射以提升代码可读性。 同时,数据库性能优化领域,一篇名为“利用MyBatis进行SQL性能调优”的技术博客强调了SQL执行计划分析的重要性,并指导读者如何借助MyBatis的日志输出功能,结合数据库自身的性能分析工具(如MySQL的EXPLAIN),对查询语句进行深度优化,从而确保系统在大数据量下仍能保持高效率运行。 此外,针对数据完整性保护,业界专家在《Java持久层设计模式》一书中提出了一系列策略,包括合理使用MyBatis的事务管理机制,以及通过预编译SQL、参数化查询等方式防止SQL注入攻击,这些内容都为提高MyBatis应用的安全性提供了有力指导。 综上所述,无论是紧跟技术前沿,了解MyBatis框架的最新发展,还是深入探究SQL性能优化与安全防护的实战经验,都是每一位使用MyBatis进行持久层开发的程序员不可忽视的重要延伸阅读内容。通过不断学习与实践,我们能够更好地驾驭MyBatis,实现系统的稳定、高效和安全运行。
2024-02-04 11:31:26
52
岁月如歌
Hive
...的一个重要组件,是大数据处理的重要工具之一。你知道的,就像那些超级复杂的机器,Hive有时候也会有点小状况,比方说,日志文件突然就出点岔子了,对吧?这不仅会影响数据的正常处理,还可能对我们的生产环境造成困扰。嘿,朋友们,今天咱们就来聊聊一个超级实用的话题:Hive的日志文件为啥会突然“罢工”,还有怎么找出问题的症结并把它修好,就像医生检查身体一样精准! 二、Hive日志文件的重要性 Hive的日志文件记录了查询执行的过程,包括但不限于SQL语句、执行计划、错误信息等。这些信息在调试问题、优化性能时至关重要。例如,当我们遇到查询运行缓慢或者失败时,日志文件就是我们寻找答案的第一线线索: sql EXPLAIN EXTENDED SELECT FROM table; 查看这个命令的执行计划,可以帮助我们理解为何查询效率低下。 三、日志文件损坏的原因 1. 磁盘故障 硬件故障是最直接的原因,如硬盘损坏或RAID阵列失效。 2. 运行异常 Hive在执行过程中如果遇到内存溢出、网络中断等情况,可能导致日志文件不完整。 3. 系统崩溃 操作系统崩溃或Hive服务突然停止也可能导致日志文件未被妥善关闭。 4. 管理操作失误 误删、覆盖日志文件也是常见的情况。 四、诊断Hive日志文件损坏 1. 使用Hive CLI检查 bash hive> show metastore_db_location; 查看Metastore的数据库位置,通常位于HDFS上,检查是否存在异常或损坏的文件。 2. 检查HDFS状态 bash hdfs dfs -ls /path/to/hive/logs 如果发现文件缺失或状态异常,可能是HDFS的问题。 3. 日志审查 打开Hive的错误日志文件,如hive.log,查看是否有明显的错误信息。 五、修复策略 1. 重新创建日志文件 如果只是临时的文件损坏,可以通过重启Hive服务或重启Metastore服务来生成新的日志。 2. 数据恢复 如果是磁盘故障导致的文件丢失,可能需要借助专业的数据恢复工具,但成功的概率较低。 3. 修复HDFS 如果是HDFS的问题,可以尝试修复文件系统,或者备份并替换损坏的文件。 4. 定期备份 为了避免类似问题,定期备份Hive的日志文件和Metastore数据是必要的。 六、预防措施 - 增强硬件监控,及时发现并处理潜在的硬件问题。 - 设置合理的资源限制,避免因内存溢出导致的日志丢失。 - 建立定期备份机制,出现问题时能快速恢复。 总结 Hive日志文件损坏可能会带来不少麻烦,但只要我们理解其重要性,掌握正确的诊断和修复方法,就能在遇到问题时迅速找到解决方案。你知道吗,老话说得好,“防患于未然”,要想让Hive这个大家伙稳稳当当的,关键就在于咱们得养成勤快的保养习惯,定期检查和打理。希望这篇小文能像老朋友一样,给你点拨一二,轻松搞定Hive日志文件出问题的烦心事。
2024-06-06 11:04:27
815
风中飘零
PostgreSQL
一、数据表索引过多导致查询性能下降 在我们日常的数据库开发过程中,我们都希望能够通过创建索引来提高查询效率。这是因为索引就像是数据库的一张超级导航图,能够迅速找到你要的数据藏在哪里,这样一来,就不用大海捞针似的把整个表格从头到尾扫一遍了。这可真是个大大的提速秘诀,让查询速度嗖嗖地提升起来!然而,有时候我们会遇到这么个情况:明明我们辛辛苦苦创建了一堆索引,本以为查询速度能嗖嗖提升,结果却不如人意,反而还冒出了一些小插曲,让人头疼不已。这就是因为我们的索引创建得太多了。 二、索引的创建原则 那么,我们应该怎样正确地创建索引呢?首先,我们需要明确一点,不是所有的字段都适合创建索引。一般来说,我们只需要在经常用于WHERE子句、JOIN子句或者ORDER BY子句的字段上创建索引。这么做的妙处在于,只有当需要用到这些字段的数据时,系统才会聪明地调用索引,这样一来,就能有效地避开那些没必要的花费,让整个过程更“轻盈”、更高效。 1. 使用explain命令分析SQL语句 为了更好地了解索引对于查询的影响,我们可以使用explain命令来分析SQL语句。这个命令能让我们像看漫画书一样,瞧瞧查询执行的“剧本”,一目了然地看到哪些字段正在被索引这位幕后英雄助力,又有哪些字段还在等待被发掘利用。这样我们就可以根据实际情况来决定是否需要创建索引。 sql EXPLAIN SELECT FROM users WHERE age > 20; 上面的SQL语句将会返回一个表格,其中包含了查询的执行计划。我们可以看到,age字段被使用到了索引,而name字段没有被使用到索引。 2. 观察SQL语句的执行情况 除了使用explain命令外,我们还可以直接观察SQL语句的执行情况,来判断是否需要创建索引。咱们可以翻翻数据库的日志文件,或者使使劲儿数据库监控工具这把“神器”,瞧瞧SQL语句执行花了多久、CPU被占用了多少、磁盘I/O的情况怎么样,这些信息都能一目了然。要是你发现某个SQL语句运行老半天还在转悠,或者CPU占用噌噌往上涨得离谱,那很可能就是因为你还没给它创建索引。 三、解决方法 知道了上述的原因后,我们就可以采取一些措施来解决这个问题了。首先,我们可以尽量减少索引的数量。这意味着我们需要更加精确地选择要创建索引的字段,避免无谓的开销。其次,咱们还可以时不时地给索引做个“大扫除”,重新构建一下,或者考虑用上一些特殊的索引技巧。比如,就像覆盖索引啦,唯一索引这些小玩意儿,都能让数据库更好地运转起来。最后,我们还可以琢磨一下采用数据库分区或者分片这招,让查询的压力能够分散开来,这样一来就不会把所有的“重活”都压在一块儿了。 四、总结 总的来说,索引是一个非常重要的概念,它能够极大地提高数据库的查询效率。然而,如果索引创建得过多,就会导致查询性能下降。因此,我们在创建索引时,一定要考虑到实际情况,避免盲目创建。同时呢,咱们也得不断给自己充电,学点新鲜的知识,掌握更多的技能才行。这样一来,面对各种难缠的问题,咱们就能更加游刃有余地解决它们了。只有这样,我们才能够成为一名真正的数据库专家。
2023-06-12 18:34:17
502
青山绿水-t
Hibernate
...Hibernate与数据库表访问权限问题深度解析 1. 引言 在企业级应用开发中,Hibernate作为一款强大的ORM框架,极大地简化了Java对象与关系型数据库之间的映射操作。然而,在实际做项目的时候,我们常常会碰到关于数据库表权限分配的难题,尤其在那种用户多、角色乱七八糟的复杂系统里头,这个问题更是频繁出现。这篇文儿,咱们要接地气地聊聊Hibernate究竟是怎么巧妙应对和化解这类权限问题的,并且会结合实际的代码例子,掰开了揉碎了给你细细道来。 2. Hibernate与数据库权限概述 在使用Hibernate进行持久化操作时,开发者需要理解其底层是如何与数据库交互的。默认情况下,Hibernate是通过连接数据库的用户身份执行所有CRUD(创建、读取、更新、删除)操作的。这就意味着,这个用户的数据库权限将直接影响到应用能否成功完成业务逻辑。 3. 权限控制的重要性 假设我们的系统中有不同角色的用户,如管理员、普通用户等,他们对同一张数据表的访问权限可能大相径庭。例如,管理员可以完全操作用户表,而普通用户只能查看自己的信息。这个时候,咱们就得在Hibernate这个环节上动点小心思,搞个更精细化的权限管理,确保不会因为权限不够而整出什么操作失误啊,数据泄露之类的问题。 4. Hibernate中的权限控制实现策略 (a) 配置文件控制 首先,最基础的方式是通过配置数据库连接参数,让不同的用户角色使用不同的数据库账号登录,每个账号具有相应的权限限制。在Hibernate的hibernate.cfg.xml配置文件中,我们可以设置如下: xml admin secret (b) 动态SQL与拦截器 对于更复杂的场景,可以通过自定义拦截器或者HQL动态SQL来实现权限过滤。例如,当我们查询用户信息时,可以添加一个拦截器判断当前登录用户是否有权查看其他用户的数据: java public class AuthorizationInterceptor extends EmptyInterceptor { @Override public String onPrepareStatement(String sql) { // 获取当前登录用户ID Long currentUserId = getCurrentUserId(); return super.onPrepareStatement(sql + " WHERE user_id = " + currentUserId); } } (c) 数据库视图与存储过程 另外,还可以结合数据库自身的安全性机制,如创建只读视图或封装权限控制逻辑于存储过程中。Hibernate照样能搞定映射视图或者调用存储过程来干活儿,这样一来,我们就能在数据库这一层面对权限实现滴水不漏的管控啦。 5. 实践中的思考与挑战 尽管Hibernate提供了多种方式实现权限控制,但在实际应用中仍需谨慎对待。比如,你要是太过于依赖那个拦截器,就像是把所有鸡蛋放在一个篮子里,代码的侵入性就会蹭蹭上涨,维护起来能让你头疼到怀疑人生。而如果选择直接在数据库层面动手脚做权限控制,虽然听起来挺高效,但特别是在那些视图或者存储过程复杂得让人眼花缭乱的情况下,性能可是会大打折扣的。 因此,在设计权限控制系统时,我们需要根据系统的具体需求,结合Hibernate的功能特性以及数据库的安全机制,综合考虑并灵活运用各种策略,以达到既能保证数据安全,又能优化性能的目标。 6. 结语 总之,数据库表访问权限管理是构建健壮企业应用的关键一环,Hibernate作为 ORM 框架虽然不能直接提供全面的权限控制功能,但通过合理利用其扩展性和与数据库的良好配合,我们可以实现灵活且高效的权限控制方案。在这个历程里,理解、探索和实践就像是我们不断升级打怪的“能量饮料”,让我们一起在这场技术的大冒险中并肩前进,勇往直前。
2023-09-21 08:17:56
418
夜色朦胧
站内搜索
用于搜索本网站内部文章,支持栏目切换。
知识学习
实践的时候请根据实际情况谨慎操作。
随机学习一条linux命令:
timeout duration command
- 执行命令并在指定时间后终止它。
推荐内容
推荐本栏目内的其它文章,看看还有哪些文章让你感兴趣。
2023-04-28
2023-08-09
2023-06-18
2023-04-14
2023-02-18
2023-04-17
2024-01-11
2023-10-03
2023-09-09
2023-06-13
2023-08-07
2023-03-11
历史内容
快速导航到对应月份的历史文章列表。
随便看看
拉到页底了吧,随便看看还有哪些文章你可能感兴趣。
时光飞逝
"流光容易把人抛,红了樱桃,绿了芭蕉。"