GENGEN
主页
vuepress
  • GIT命令
  • python+django
  • vue cli搭建项目
  • babel es6转换es5
  • docker aliyun配置
  • npm 配置
  • linux 常用命令
  • Ubuntu 下Linux 命令
  • github
  • gitee
  • csdn
  • 关于我
主页
vuepress
  • GIT命令
  • python+django
  • vue cli搭建项目
  • babel es6转换es5
  • docker aliyun配置
  • npm 配置
  • linux 常用命令
  • Ubuntu 下Linux 命令
  • github
  • gitee
  • csdn
  • 关于我
  • java基础

    • JDK8 函数式编程
    • JDK8 新特性之Date-Time
    • Servlet 源码分析
    • ArrayList 源码
    • LinkedList 源码
    • HashMap 源码
    • String 源码
    • BigDecimal 源码
    • java 类的加载
    • Class 源码
    • Synchronized锁升级
    • 事务的传播机制
    • knowledge
  • JAVA WEB

    • Java Servlet
    • 权限设计
    • logback日志的链路追踪
  • DATABASE

    • MySQL EXPLAIN详解
    • MySQL 索引
    • MySQL 表锁、行锁
    • MySQL ACID与transcation
    • 分布式事务
    • MySQL MVCC机制
    • Mysql 乐观锁与悲观锁
    • 分布式锁1 数据库分布式锁
    • 分布式锁2 Redis分布式锁
    • 分布式锁3 ZK分布式锁
  • SpringCloud

    • SpringCloud服务注册中心之Eureka
    • SpringCloud服务注册中心之Zookeeper
    • SpringCloud服务调用之Ribbon
    • SpringCloud服务调用之OpenFeign
    • SpringCloud服务降级之Hystrix
    • SpringCloud服务网关之Gateway
    • SpringCloud Config分布式配置中心
    • SpringCloud服务总线之Bus
    • SpringCloud消息驱动之Stream
    • SpringCloud链路追踪之Sleuth
    • SpringCloud Alibaba Nacos
    • SpringCloud Alibaba Sentinel
  • Spring

    • SpringBoot
    • Spring-data-jpa入门
    • SpringCloud问题
    • dispatcherServlet 源码分析
    • @SpringBootApplication注解内部实现与原理
    • spring启动初始化初始化
  • 中间件

    • 分布式协调服务器Zookeeper
    • 服务治理Dubbo
    • 分布式配置管理平台Apollo
    • 消息中间件框架Kafka
    • 分布式调度平台ElasticJob
    • 可视化分析工具Kibana
    • ElacticSearch 基础
    • ElacticSearch进阶
    • ElacticSearch集成
  • 环境部署

    • 应用容器引擎Docker
    • DockerCompose服务编排
    • 负载均衡Nginx
    • Nginx的安装配置
    • K8S基础
  • 代码片段

    • listener 监听模式
    • spingboot 整合redis
    • XSS过滤
    • profile的使用
    • ConfigurationProperties注解
  • 设计模式

    • 工厂模式
    • 单例模式
    • 装饰者模式
    • 适配器模式
    • 模板方法模式
    • 观察者模式
  • 读书笔记

    • 《Spring in Action 4》 读书笔记
    • 《高性能mysql》 读书笔记
  • NoSQL

    • Redis基础
    • Redis高级
    • Redis集群
    • Redis应用
  • MQ

    • rabbitMQ基础
    • rabbitMQ高级
    • rabbitMQ集群
  • JVM

    • JVM体系架构概述
    • 堆参数调整
    • GC 分代收集算法
    • JVM 垃圾回收器
    • JVM 相关问题
  • JUC

    • JUC总览
    • volatile关键字
    • CAS
    • ABA问题
    • collections包下线程安全的集合类
    • Lock 锁
    • LockSupport
    • AQS
    • Fork/Join分支框架
    • JUC tools
    • BlockingQueue 阻塞队列
    • Executor 线程池
    • CompletableFuture
    • 死锁以及问题定位分析
  • Shell

    • shell命令
    • shell基础
  • Activiti

    • IDEA下的Activiti HelloWord
    • 流程定义的CRUD
    • 流程实例的执行
    • 流程变量
  • VUE

    • vue基础
    • vue router
    • Vuex
    • Axios 跨域
    • dialog 弹出框使用
    • vue 动态刷新页面
    • vue 封装分页组件
    • vue 动态菜单
    • vue 常用传值
  • Solidity 智能合约

    • Solidity 基础
    • Solidity ERC-20
    • Solidity 101
  • English

    • 时态

Elastic-Job-分布式调度框架运用

  • 官方文档:http://elasticjob.io/docs/elastic-job-lite

分片概念

  • 任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。(多个独立进程,分别处理部分数据。)

  • 设置好分片策略,Elastic-Job会将分片项配置各个运行中的作业服务器。开发者需要自行处理分片项与真实数据的对应关系。

Elastic-Job

  • Elastic-Job 是一个分布式的任务调度平台。Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。

  • Elastic-Job会动态的分片分片情况,将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。当其中一台服务器崩溃,Elastic-Job会重新分配分片。

  • 如果不想执行分片,可以把分片策略设置为1,当使用多余1台服务器执行作业时,Elastic-Job会将以1主n从的方式执行。一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。

与springboot 2.2.2.RELEASE 整合

引入依赖

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.5</version>
</dependency>
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>2.1.5</version>
</dependency>

zk注册中心配置

@Configuration
@ConditionalOnExpression("'${spring.elasticjob.serverList}'.length() > 0")
public class JobRegistryCenterConfig {

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(@Value("${spring.elasticjob.serverList}") final String serverList,
                                             @Value("${spring.elasticjob.namespace}") final String namespace){
        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }

}

application.properties配置

server.port=8767
spring.application.name=scheduler-service
#  zookeeper注册中心
spring.elasticjob.serverList = 127.0.0.1:2181
spring.elasticjob.namespace = elastic-job-lite-springboot

xml配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
       xmlns:job="http://www.dangdang.com/schema/ddframe/job"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.dangdang.com/schema/ddframe/reg
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                        http://www.dangdang.com/schema/ddframe/job
                        http://www.dangdang.com/schema/ddframe/job/job.xsd
                        ">
    <!--注册中心-->
    <reg:zookeeper id="regCenter1" server-lists="${spring.elasticjob.serverList}" namespace="${spring.elasticjob.namespace}"
                   base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
    <!--任务bean start-->
    <!--分片模式,两台服务器执行不同的case:-->
    <job:simple id="firstJob" class="com.elastic.job.exc.FirstJob" registry-center-ref="regCenter1" cron="0/5 * * * * ?"
                sharding-total-count="2" sharding-item-parameters="0=A,1=B" />
    <!--配置主从模式,只有一台机器运行-->
    <job:simple id="singleJob" class="com.elastic.job.exc.FirstJob" registry-center-ref="regCenter1" cron="0/5 * * * * ?"
                sharding-total-count="1" sharding-item-parameters="0=single" />

    <!--任务bean end-->
</beans>

启动入口引入elasticJob.xml

@SpringBootApplication
@ImportResource(locations = "classpath*:elasticJob.xml")
public class ElasticJobApplication {

    public static void main(String[] args) {
        SpringApplication.run(ElasticJobApplication.class, args);
    }

}

Simple类型 JOB实现

  • 分片
@Component
public class FirstJob implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        List<String> list;
        switch (shardingContext.getShardingItem()){
            case 0:
                list = Arrays.asList("A1,A2,A3");
                System.out.println(String.format("--Thread ID:%s,任务总篇数:%s,当前分页片:%s,当前参数:%s," +
                                "当前任务名称:%s,当前任务结果:%s",Thread.currentThread().getId(),shardingContext.getShardingTotalCount(),
                        shardingContext.getShardingItem(),shardingContext.getShardingParameter(),shardingContext.getJobName(),
                        JSON.toJSONString(list)));
                break;
            case 1:
                list = Arrays.asList("B1,B2");
                System.out.println(String.format("--Thread ID:%s,任务总篇数:%s,当前分页片:%s,当前参数:%s," +
                                "当前任务名称:%s,当前任务结果:%s",Thread.currentThread().getId(),shardingContext.getShardingTotalCount(),
                        shardingContext.getShardingItem(),shardingContext.getShardingParameter(),shardingContext.getJobName(),
                        JSON.toJSONString(list)));
                break;
            default:
                System.out.println("do nothing!");
        }

    }
}
  • 主从
@Component
public class SingleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println(String.format("SINGLE-JOB--Thread ID:%s,任务总篇数:%s,当前分页片:%s,当前参数:%s," +
                        "当前任务名称:%s,当前任务结果:%s",Thread.currentThread().getId(),shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),shardingContext.getShardingParameter(),shardingContext.getJobName(),
                shardingContext.getJobParameter()));
    }
}

Dataflow类型 JOB实现

  • 流式处理数据,实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。
  • 流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去;
@Component
public class MyDataflowJob implements DataflowJob<Person> {
    /**
     * 抓取
     *
     * @param shardingContext
     * @return
     */
    @Override
    public List<Person> fetchData(ShardingContext shardingContext) {
        List<Person> personList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Person p = new Person();
            p.setAge(i);
            p.setName("w" + i + "s");
            personList.add(p);
        }
        
        return personList;
    }

    /**
     * 处理
     *
     * @param shardingContext
     * @param list
     */
    @Override
    public void processData(ShardingContext shardingContext, List<Person> list) {
        for (Person person : list) {
            System.out.println("获取的姓名:" + person.getName() + "年龄:" + person.getAge());
        }
    }
}
  • xml中注册bean
<job:dataflow  id="myDataflowJob1" class="com.elastic.job.exc.MyDataflowJob" registry-center-ref="regCenter1" cron="0/5 * * * * ?"
                   sharding-total-count="1" sharding-item-parameters="0=person" />

总结

  • 上面配置,一个简单的分布式调度平台就完成了,启动ZK,修改端口号,分别起两个服务A,B。
  • 从打印可以看出,分片策略的FirstJob,任务分成两部分分别在A,B上运行。
  • 分片策略为主从的SingleJob,只在一台运行,停掉执行SingleJob的A服务,主从的SingleJob转移到B执行,而分片的所有任务也都在B上执行。

github demo

git remote add origin https://github.com/lidagen/elastic-job.git git push -u origin master

Last Updated:
Contributors: 88395515, wal365@126.com
Prev
消息中间件框架Kafka
Next
可视化分析工具Kibana