WebReactive动态sql + 动态数据源切换


spring data r2dbc 动态sql

关键代码

package show.lmm.spring_data_r2dbc.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import show.lmm.spring_data_r2dbc.core.DynamicDataSource;
import show.lmm.spring_data_r2dbc.core.entity.UserInfo;
import show.lmm.spring_data_r2dbc.mapper.UserRepository;

/**
 * 用户 service
 */
@Service
public class UserService {
    private final UserRepository userRepository;

    @Autowired
    private final DatabaseClient dbClient;

    public UserService(
            UserRepository userRepository,
            DatabaseClient dbClient) {
        this.userRepository = userRepository;
        this.dbClient = dbClient;
    }

    /**
     * 查询用户信息
     *
     * @param userId 用户id
     */
    public Mono<UserInfo> get(int userId) {
        return userRepository.findById(userId);
    }

    /**
     * 查询最大的用户id
     */
    public Mono<Integer> getMaxUserId() {
        return dbClient.sql("select max(user_id) user_id from user_info where status = 1")
                .fetch().one()
                .flatMap(result -> {
                    if (result.get("user_id") instanceof Integer v) {
                        return Mono.just(v);
                    }
                    return Mono.just(0);
                });
    }

    /**
     * 查询用户列表
     */
    public Flux<UserInfo> list(String dbName) {
        return dbClient.sql("select user_id,user_name from user_info where status = 1")
                .fetch().all()
                .flatMap(result -> {
                    UserInfo userInfo = new UserInfo();
                    if (result.get("user_id") instanceof Integer userId) {
                        userInfo.setUserId(userId);
                    }
                    if (result.get("user_name") instanceof String userName) {
                        userInfo.setUserName(userName);
                    }
                    return Mono.just(userInfo);
                })
                .contextWrite(context -> context.put(DynamicDataSource.CURRENT_DB_NAME, dbName));
    }

    /**
     * 新增用户
     *
     * @param userName 用户名
     */
    public Mono<Boolean> insert(String userName) {
        return dbClient.sql("insert into user_info(user_name) values(:userName)")
                .bind("userName", userName)
                .fetch().rowsUpdated().hasElement();
    }

    /**
     * 新增用户
     *
     * @param userId 用户id
     */
    public Mono<Boolean> delete(int userId) {
        return dbClient.sql("delete from user_info where user_id= :userId")
                .bind("userId", userId)
                .fetch().rowsUpdated().hasElement();
    }

    /**
     * 测试事务
     */
    @Transactional(rollbackFor = Exception.class)
    public Flux<UserInfo> testTransaction(String userName) {
        insert(userName);
        if (1 == 1) {
            throw new RuntimeException("test error");
        }
        return list("tenant1");
    }
}

spring data r2dbc动态数据源实现方案

r2dbc提供了org.springframework.r2dbc.connection.lookup.AbstractRoutingConnectionFactory抽象类,并需要自己实现determineCurrentLookupKeysetDefaultTargetConnectionFactory方法,以达到动态切换数据源的目的。

项目中实现动态数据源的流程如下

  • 初始化DynamicDataSource Bean,并加载所有数据源
  • 设置DynamicDataSource的默认数据源
  • 重写determineCurrentLookupKey方法,返回动态数据源key

关键代码如下

/**
 * 动态数据源
 */
public class DynamicDataSource extends AbstractRoutingConnectionFactory {

    // 数据源列表
    private final Map<String, ConnectionFactory> connectionFactoryMap = new HashMap<>();

    // 数据库名称key
    public static final String CURRENT_DB_NAME = "dbName";

    /**
     * 查询首个db链接
     */
    public ConnectionFactory getFirstConnection() {
        if (connectionFactoryMap.isEmpty()) {
            return null;
        }
        final String key = new ArrayList<>(connectionFactoryMap.keySet()).get(0);
        return connectionFactoryMap.get(key);
    }

    /**
     * 设置可用数据源
     */
    public void setTargetConnectionFactories() {
        super.setTargetConnectionFactories(connectionFactoryMap);
    }

    @Override
    public void setDefaultTargetConnectionFactory(Object defaultTargetConnectionFactory) {
        super.setDefaultTargetConnectionFactory(defaultTargetConnectionFactory);
    }

    @Override
    protected Mono<Object> determineCurrentLookupKey() {
        return Mono.deferContextual(Mono::just).handle(((context, sink) -> {
            final String dbName = context.getOrDefault(CURRENT_DB_NAME, null);
            if (Optional.ofNullable(dbName).isPresent()) {
                sink.next(dbName);
            }
        }));
    }

    /**
     * 新增数据源
     *
     * @param dbName        数据库名称
     * @param connectionUrl 链接字符串
     */
    public void addDataSource(String dbName, String connectionUrl) {
        connectionFactoryMap.put(dbName, ConnectionFactories.get(connectionUrl));
    }
}

示例

https://gitee.com/luoye/examples/tree/main/spring_data_r2dbc


文章作者: Ming Ming Liu
文章链接: https://www.lmm.show/29/
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Ming Ming Liu !
  目录