spring data r2dbc 动态sql
关键代码
package show.lmm.spring_data_r2dbc.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import show.lmm.spring_data_r2dbc.core.DynamicDataSource;
import show.lmm.spring_data_r2dbc.core.entity.UserInfo;
import show.lmm.spring_data_r2dbc.mapper.UserRepository;
/**
* 用户 service
*/
@Service
public class UserService {
private final UserRepository userRepository;
@Autowired
private final DatabaseClient dbClient;
public UserService(
UserRepository userRepository,
DatabaseClient dbClient) {
this.userRepository = userRepository;
this.dbClient = dbClient;
}
/**
* 查询用户信息
*
* @param userId 用户id
*/
public Mono<UserInfo> get(int userId) {
return userRepository.findById(userId);
}
/**
* 查询最大的用户id
*/
public Mono<Integer> getMaxUserId() {
return dbClient.sql("select max(user_id) user_id from user_info where status = 1")
.fetch().one()
.flatMap(result -> {
if (result.get("user_id") instanceof Integer v) {
return Mono.just(v);
}
return Mono.just(0);
});
}
/**
* 查询用户列表
*/
public Flux<UserInfo> list(String dbName) {
return dbClient.sql("select user_id,user_name from user_info where status = 1")
.fetch().all()
.flatMap(result -> {
UserInfo userInfo = new UserInfo();
if (result.get("user_id") instanceof Integer userId) {
userInfo.setUserId(userId);
}
if (result.get("user_name") instanceof String userName) {
userInfo.setUserName(userName);
}
return Mono.just(userInfo);
})
.contextWrite(context -> context.put(DynamicDataSource.CURRENT_DB_NAME, dbName));
}
/**
* 新增用户
*
* @param userName 用户名
*/
public Mono<Boolean> insert(String userName) {
return dbClient.sql("insert into user_info(user_name) values(:userName)")
.bind("userName", userName)
.fetch().rowsUpdated().hasElement();
}
/**
* 新增用户
*
* @param userId 用户id
*/
public Mono<Boolean> delete(int userId) {
return dbClient.sql("delete from user_info where user_id= :userId")
.bind("userId", userId)
.fetch().rowsUpdated().hasElement();
}
/**
* 测试事务
*/
@Transactional(rollbackFor = Exception.class)
public Flux<UserInfo> testTransaction(String userName) {
insert(userName);
if (1 == 1) {
throw new RuntimeException("test error");
}
return list("tenant1");
}
}
spring data r2dbc动态数据源实现方案
r2dbc提供了org.springframework.r2dbc.connection.lookup.AbstractRoutingConnectionFactory
抽象类,并需要自己实现determineCurrentLookupKey
、setDefaultTargetConnectionFactory
方法,以达到动态切换数据源的目的。
项目中实现动态数据源的流程如下
- 初始化DynamicDataSource Bean,并加载所有数据源
- 设置DynamicDataSource的默认数据源
- 重写
determineCurrentLookupKey
方法,返回动态数据源key
关键代码如下
/**
* 动态数据源
*/
public class DynamicDataSource extends AbstractRoutingConnectionFactory {
// 数据源列表
private final Map<String, ConnectionFactory> connectionFactoryMap = new HashMap<>();
// 数据库名称key
public static final String CURRENT_DB_NAME = "dbName";
/**
* 查询首个db链接
*/
public ConnectionFactory getFirstConnection() {
if (connectionFactoryMap.isEmpty()) {
return null;
}
final String key = new ArrayList<>(connectionFactoryMap.keySet()).get(0);
return connectionFactoryMap.get(key);
}
/**
* 设置可用数据源
*/
public void setTargetConnectionFactories() {
super.setTargetConnectionFactories(connectionFactoryMap);
}
@Override
public void setDefaultTargetConnectionFactory(Object defaultTargetConnectionFactory) {
super.setDefaultTargetConnectionFactory(defaultTargetConnectionFactory);
}
@Override
protected Mono<Object> determineCurrentLookupKey() {
return Mono.deferContextual(Mono::just).handle(((context, sink) -> {
final String dbName = context.getOrDefault(CURRENT_DB_NAME, null);
if (Optional.ofNullable(dbName).isPresent()) {
sink.next(dbName);
}
}));
}
/**
* 新增数据源
*
* @param dbName 数据库名称
* @param connectionUrl 链接字符串
*/
public void addDataSource(String dbName, String connectionUrl) {
connectionFactoryMap.put(dbName, ConnectionFactories.get(connectionUrl));
}
}
示例
https://gitee.com/luoye/examples/tree/main/spring_data_r2dbc