MySQL中实现 upsert 操作 & Java 实现

Dec 20, 2018 阅读(2576)

标签: Mysql Java

该操作的实现原理是通过判断插入的记录里是否存在主键冲突来决定是插入还是更新,当出现主键冲突时则进行更新操作(使用 ON DUPLICATE KEY UPDATE 语句后面的参数),若无冲突则进行插入操作。

-- prepared sql
INSERT INTO td_person(id, name, age) 
VALUES(?, ?, ?) ON DUPLICATE KEY UPDATE id = ?, name = ?, age = ?

-- 实例
INSERT INTO td_person( id, name,age )
VALUES ( 1, 'xuexiyuan.cn', 24 )
ON DUPLICATE KEY UPDATE
        id = 1,
        name = 'xuexiyuan.cn',
        age = 24

 

JAVA 数据库 upsert 工具类, DbUtils.java  :

package cn.xuexiyuan.blog.util;

import java.sql.*;
import java.util.*;
import java.util.Date;
import java.util.Map.Entry;

import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

public class DbUtils {
    private static final Logger log = Logger.getLogger(DbUtils.class);

    /**
     * 绑定元素
     *
     * @param pstmt
     * @param values
     * @throws SQLException
     */
    private static void preparedStatementBind(PreparedStatement pstmt, List<Object> values) throws SQLException {
        for (int j = 0; j < values.size(); j++) {
            Object val = values.get(j);
            log.debug(j + " 预绑定sql:" + val);
            if (val instanceof Date) {
                val = new java.sql.Timestamp(((Date) val).getTime());
            }
            pstmt.setObject(j + 1, val);
        }
    }

    /**
     * upsert操作
     *
     * @param conn        数据库连接
     * @param tableName   数据表名
     * @param dataMap     数据Map
     * @param dbType      数据库类型,mysql | oracle | sqlserver
     * @param primaryKeys 主键,联合主键键之间用逗号分割
     * @return
     * @throws SQLException
     */
    public static int upsert(Connection conn, String tableName, Map<String, Object> dataMap, String dbType, String primaryKeys) throws SQLException {
        int result = 0;
        if (null == conn) {
            log.warn("conn is null");
            return -1;
        }

        if (tableName == null || "".equals(tableName)) {
            log.warn("tableName is blank");
            return -1;
        }

        if (null == dataMap || dataMap.size() == 0) {
            log.warn("dataMap is empty");
            return -1;
        }

        if (StringUtils.isBlank(primaryKeys)) {
            log.warn("upsert 操作时 primaryKeys 不允许为空");
            return -1;
        }

        List<String> paimaryKeyList = Arrays.asList(primaryKeys.split(","));

        //去 null 操作
        int size = dataMap.size();
        StringBuilder fileds = new StringBuilder();
        List<Object> values = new ArrayList<>(size);
        StringBuilder placeholders = new StringBuilder();
        StringBuilder dupKeyUpd = new StringBuilder();
        List<Object> dupKeyUpdValues = new ArrayList<>(paimaryKeyList.size());
        Map deleteWhere = new HashMap();
        for (Entry<String, Object> entry : dataMap.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();
            if (null != key && null != value) {
                fileds.append(",").append(key);
                placeholders.append(",?");
                values.add(value);

                if (paimaryKeyList.contains(key)) {
                    deleteWhere.put(key, value);

                    if ("mysql".equalsIgnoreCase(dbType)) {
                        dupKeyUpd.append(", ").append(key).append("=?");
                        dupKeyUpdValues.add(value);
                    }
                }
            }
        }

        if ("mysql".equalsIgnoreCase(dbType)) {
            PreparedStatement pstmt = null;    //实例化预编译语句
            String sql = String.format("insert into %s(%s) values(%s) on duplicate key update %s", tableName, fileds.toString().substring(1), placeholders.toString().substring(1), dupKeyUpd.toString().substring(1));
            try {
                log.debug("upsert sql: " + sql);
                pstmt = conn.prepareStatement(sql);
                preparedStatementBind(pstmt, values);

                for (int j = 0; j < dupKeyUpdValues.size(); j++) {
                    Object val = dupKeyUpdValues.get(j);
                    if (val instanceof Date) {
                        val = new java.sql.Timestamp(((Date) val).getTime());
                    }
                    log.debug((values.size() + j + 1) + " 预绑定sql:" + val);
                    pstmt.setObject(values.size() + j + 1, val);
                }
                result = pstmt.executeUpdate();
                log.debug("upsert executeUpdate finishion. result: " + result);
            } catch (Exception e) {
                log.error("upsert executeUpdate 异常", e);
            } finally {
                DbUtils.releaseResource(pstmt);
            }
        } else {
            try {
                conn.setAutoCommit(false);
                DbUtils.delete(conn, tableName, deleteWhere);
                DbUtils.insert(conn, tableName, dataMap);
                conn.commit();
            } catch (Exception e) {
                log.error("upsert execute 异常, 准备回滚事物", e);
                conn.rollback();
            }
        }

        return result;
    }

    /**
     * 删除数据
     *
     * @param conn
     * @param tableName
     * @param deleteWhere
     * @return
     * @throws SQLException
     */
    public static boolean delete(Connection conn, String tableName, Map<String, Object> deleteWhere) throws SQLException {
        boolean result = false;
        PreparedStatement pstmt = null;    //实例化预编译语句

        String delSql = String.format("delete from %s", tableName);
        List values = null;
        if (null != deleteWhere && deleteWhere.size() > 0) {
            values = new ArrayList();
            StringBuilder wssb = new StringBuilder();
            for (Entry<String, Object> entry : deleteWhere.entrySet()) {
                String key = entry.getKey();
                Object value = entry.getValue();
                if (null != key && null != value) {
                    wssb.append(",").append(key).append("=?");
                    values.add(value);
                }
            }
            delSql = delSql + " where " + wssb.toString().substring(1);
        }

        try {
            log.debug("delete sql: " + delSql);
            pstmt = conn.prepareStatement(delSql);    // 实例化预编译语句
            if (null != values && values.size() > 0) {
                preparedStatementBind(pstmt, values);
            }

            result = pstmt.execute();
            log.debug("delete execute finishion. result: " + result);
        } catch (Exception e) {
            log.error("delete execute 异常", e);
        } finally {
            DbUtils.releaseResource(pstmt);
        }

        return result;
    }

    /**
     * 插入操作
     *
     * @param conn      数据库连接
     * @param tableName 数据表名
     * @param dataMap   数据Map
     * @return
     * @throws SQLException
     */
    public static boolean insert(Connection conn, String tableName, Map<String, Object> dataMap) throws SQLException {
	    boolean result = false;
		if(null == conn ) {
			log.warn("conn is null");
			return false;
		}
		
		if(tableName == null || "".equals(tableName)) {
			log.warn("tableName is blank");
			return false;
		}
		
		if(null == dataMap || dataMap.size() == 0) {
			log.warn("dataMap is empty");
			return false;
		}
		
		//去 null 操作
		int size = dataMap.size();
        StringBuilder fileds = new StringBuilder();
        List<Object> values = new ArrayList<>(size);
        StringBuilder placeholders = new StringBuilder();
		for (Entry<String, Object> entry : dataMap.entrySet()) {
			String key = entry.getKey();
			Object value = entry.getValue();
			if(null != key && null != value) {
                fileds.append(",").append(key);
				placeholders.append(",?");
				values.add(value);
			}
		}
		
		String sql = String.format("insert into %s(%s) values(%s)", tableName, fileds.toString().substring(1), placeholders.toString().substring(1));
        log.debug("insert sql: " + sql);
        PreparedStatement pstmt = null;
        try {
            pstmt = conn.prepareStatement(sql);
            preparedStatementBind(pstmt, values);
            result = pstmt.execute();
            log.debug("insert execute finishion. result: " + result);
		} catch (Exception e) {
            log.error("insert 异常", e);
		}finally {
            pstmt.close();
        }
		
		return result;
	}


    /**
     * 释放资源
     *
     * @param closeable
     */
    public static void releaseResource(AutoCloseable... closeable) {
        if(null != closeable && closeable.length > 0 ){
            for (AutoCloseable o : closeable){
                if(null != o){
                    try {
                        o.close();
                        log.debug(o.getClass().getName() + " ,close 释放资源成功");
                    } catch (Exception e) {
                        log.debug(o.getClass().getName() + " ,close 异常", e);
                    }
                }
            }
        }
    }
	
}


MongoDB学习园