Spring-Batch-Getting-Start
一、为什么需要批处理?
1. 应用场景解析
场景1:银行每日利息计算
- 痛点:凌晨时段需扫描百万级账户数据,手工计算容易遗漏
- Spring Batch方案:分片读取账户数据,批量计算利息,失败自动重试
- 实际案例:某银行系统改造后,利息计算时间从4小时缩短至23分钟
场景2:电商订单归档
// 传统SQL示例(存在性能问题)
DELETE FROM active_orders
WHERE create_time < '2023-01-01'
LIMIT 5000; // 需循环执行直到无数据
- 问题:直接删除百万级数据会导致数据库锁表
- 正确做法:使用Spring Batch分页读取→写入历史表→批量删除
场景3:日志分析
- 典型需求:分析Nginx日志中的API响应时间分布
- 特殊挑战:处理GB级文本文件时的内存控制
场景4:医疗数据迁移
- 特殊要求:迁移过程中老系统仍需 正常使用
- 解决方案:使用Spring Batch的增量迁移模式
2. 传统方式痛点
详细解释每个痛点:
- 资源管理复杂
// 典型的多线程错误示例
ExecutorService executor = Executors.newFixedThreadPool(8);
try {
while(hasNextPage()) {
List<Data> page = fetchNextPage();
executor.submit(() -> processPage(page)); // 可能引发内存泄漏
}
} finally {
executor.shutdown(); // 忘记调用会导致线程堆积
}
常见问题:线程池配置不当导致OOM、数据库连接泄露
- 容错性黑洞
// 伪代码:脆弱的错误处理
for (int i=0; i<3; i++) {
try {
processBatch();
break;
} catch (Exception e) {
if (i == 2) sendAlert(); // 简单重试无法处理部分成功场景
}
}
真实案例:某支付系统因未处理部分失败,导致重复出款
- 维护噩梦
# 典型硬编码配置
batch.size=1000
input.path=/data/in
output.path=/data/out
问题根源:参数修改需要重新部署、不同环境配置混杂
- 监控盲区
# 开发人员常用的临时方案
nohup java -jar batch.jar > log.txt 2>&1 &
tail -f log.txt # 无法获知实时进度
关键缺陷:无法回答"处理到哪了?"、"还剩多少?"等业务问题
Spring Batch对比优势表
| 功能点 | 传统方式 | Spring Batch方案 |
|---|---|---|
| 任务重启 | 需从零开始 | 支持断点续处理 |
| 事务管理 | 手动控制commit/rollback | 自动分块事务 |
| 错误处理 | try-catch嵌套地狱 | Skip/Retry策略声明式配置 |
| 监控 | 查看日志文件 | 数据库存储执行元数据 |
| 扩展性 | 修改代码才能增加处理步骤 | 通过Step组合灵活编排 |
二、Spring Batch核心架构
1. 四大金刚组件深度解析
组件1:Job(作业工厂)
- 核心作用:定义完整的批处理流水线(如月度报表生成流程)
- 真实案例:某银行的日终对账Job包含三个Step
@Bean
public Job reconciliationJob() {
return jobBuilderFactory.get("dailyReconciliation")
.start(downloadBankFileStep())
.next(validateDataStep())
.next(generateReportStep())
.build();
}
组件2:Step(装配流水线)
- 设计模式:采用分块(Chunk)处理机制
- 配 置示例:
@Bean
public Step importStep() {
return stepBuilderFactory.get("csvImport")
.<User, User>chunk(500) // 每500条提交一次
.reader(csvReader())
.processor(validationProcessor())
.writer(dbWriter())
.faultTolerant()
.skipLimit(10)
.skip(DataIntegrityViolationException.class)
.build();
}
组件3:ItemReader(数据搬运工)
- 典型实现:
// 读取CSV文件示例
@Bean
public FlatFileItemReader<User> csvReader() {
return new FlatFileItemReaderBuilder<User>()
.name("userReader")
.resource(new FileSystemResource("data/users.csv"))
.delimited().delimiter(",")
.names("id", "name", "email")
.fieldSetMapper(new BeanWrapperFieldSetMapper<User>() {{
setTargetType(User.class);
}})
.linesToSkip(1) // 跳过标题行
.build();
}
组件4:ItemWriter(数据收纳师)
复合写入示例:
@Bean
public CompositeItemWriter<User> compositeWriter() {
return new CompositeItemWriterBuilder<User>()
.delegates(dbWriter(), logWriter(), mqWriter())
.build();
}
// 数据库写入组件
private JdbcBatchItemWriter<User> dbWriter() {
return new JdbcBatchItemWriterBuilder<User>()
.dataSource(dataSource)
.sql("INSERT INTO users (name,email) VALUES (:name,:email)")
.beanMapped()
.build();
}
2. 架构示意图
3. 隐藏BOSS:ItemProcessor(数据变形金刚)
典型应用:数据脱敏处理
public class DataMaskProcessor implements ItemProcessor<User, User> {
@Override
public User process(User user) {
// 手机号脱敏
String phone = user.getPhone();
user.setPhone(phone.replaceAll("(\\d{3})\\d{4}(\\d{4})", "$1****$2"));
// 邮箱转小写
user.setEmail(user.getEmail().toLowerCase());
return user;
}
}
4. 组件生命周期探秘
三、手把手开发指南
1. 环境搭建
<!-- 完整POM配置 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>
<dependencies>
<!-- Batch核心依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- 内存数据库(生产环境可更换为MySQL等) -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Lombok简化代码 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
# application.properties
spring.batch.jdbc.initialize-schema=always # 自动创建Batch元数据表
spring.datasource.url=jdbc:h2:mem:testdb
spring.datasource.driverClassName=org.h2.Driver
2. 第一个批处理任务
领域模型类:
@Data // Lombok注解
@NoArgsConstructor
@AllArgsConstructor
public class User {
private String name;
private int age;
private String email;
}
完整Job配置:
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired private JobBuilderFactory jobBuilderFactory;
@Autowired private StepBuilderFactory stepBuilderFactory;
// 定义Job
@Bean
public Job importUserJob() {
return jobBuilderFactory.get("importUserJob")
.start(csvProcessingStep())
.build();
}
// 定义Step
@Bean
public Step csvProcessingStep() {
return stepBuilderFactory.get("csvProcessing")
.<User, User>chunk(100) // 每处理100条提交一次
.reader(userReader())
.processor(userProcessor())
.writer(userWriter())
.build();
}
// CSV文件读取器
@Bean
public FlatFileItemReader<User> userReader() {
return new FlatFileItemReaderBuilder<User>()
.name("userReader")
.resource(new ClassPathResource("users.csv")) // 文 件路径
.delimited()
.delimiter(",")
.names("name", "age", "email") // 字段映射
.targetType(User.class)
.linesToSkip(1) // 跳过标题行
.build();
}
// 数据处理(示例:年龄校验)
@Bean
public ItemProcessor<User, User> userProcessor() {
return user -> {
if (user.getAge() < 0) {
throw new IllegalArgumentException("年龄不能为负数: " + user);
}
return user.toBuilder() // 使用Builder模式创建新对象
.email(user.getEmail().toLowerCase())
.build();
};
}
// 数据库写入器
@Bean
public JdbcBatchItemWriter<User> userWriter(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<User>()
.dataSource(dataSource)
.sql("INSERT INTO users (name, age, email) VALUES (:name, :age, :email)")
.beanMapped()
.build();
}
}
CSV文件示例(src/main/resources/users.csv):
name,age,email
张三,25,zhangsan@example.com
李四,30,lisi@example.com
王五,-5,wangwu@example.com
启动类:
@SpringBootApplication
public class BatchApplication implements CommandLineRunner {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importUserJob;
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
JobParameters params = new JobParametersBuilder()
.addLong("startAt", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(importUserJob, params);
}
}
3. 执行流程可视化
4. 运行效果验证
控制台输出:
2023-10-01 10:00:00 INFO o.s.b.c.l.support.SimpleJobLauncher - Job: [SimpleJob: [name=importUserJob]] launched
2023-10-01 10:00:05 INFO o.s.batch.core.job.SimpleStepHandler - Executing step: [csvProcessing]
2023-10-01 10:00:15 ERROR o.s.batch.core.step.AbstractStep - Encountered an error executing step csvProcessing
org.springframework.batch.item.validator.ValidationException: 年龄不能为负数: User(name=王五, age=-5, email=wangwu@example.com)
数据库结果:
SELECT * FROM users;
| name | age | |
|---|---|---|
| 张三 | 25 | zhangsan@example.com |
| 李四 | 30 | lisi@example.com |
5. 调试技巧
查看元数据:
SELECT * FROM BATCH_JOB_INSTANCE;
SELECT * FROM BATCH_STEP_EXECUTION;
重试失败任务:
// 在Job配置中添加容错机制
@Bean
public Step csvProcessingStep() {
return stepBuilderFactory.get("csvProcessing")
.<User, User>chunk(100)
.reader(userReader())
.processor(userProcessor())
.writer(userWriter())
.faultTolerant()
.skipLimit(3) // 最多跳过3条错误
.skip(IllegalArgumentException.class)
.build();
}
日志监控配置:
logging.level.org.springframework.batch=DEBUG
logging.level.org.hibernate.SQL=WARN