使用MySqlBulkLoader批量插入数据

打印 上一主题 下一主题

主题 527|帖子 527|积分 1581

 
最近在项目中遇到插入数据瓶颈,几万、几十万、几百万的数据保存到MYSQL数据库,使用EF插入数据速度非常慢,数据量非常大时EF插入需要几十分钟,甚至几个小时,这样子的速度肯定不是我们所期望的。
后面经过了解与研究发现MySqlBulkLoader,可以批量将数据插入到数据库并且速度上面远远优于EF。
MySqlBulkLoader主要的实现方式:将需要插入的数据转成DataTable,DataTable转成一个CSV文件,将CSV文件使用批量导入的形式导入到数据库里面去。
 
注意:
1).数据库连接地址需要添加配置AllowLoadLocalInfile=true,允许本地文件导入;
Data Source = 数据库地址; Port = 端口; Initial Catalog = 数据库名; User Id = 用户名; Password = 密码;AllowLoadLocalInfile=true;
2).插入的时候会返回插入行数,但是检查所有的数据都正确,也没有报异常,却返回了插入数量为0,可以检查表是否有唯一索引,插入的数据是否违反了唯一索引
(以下分块展示了代码,如果需要看完整的代码直接看 5.完整的代码) 
 
1.将List转化为DataTable 
  1.    /// <summary>
  2.         /// 将List转化为DataTable
  3.         /// </summary>
  4.         /// <returns></returns>
  5.         public DataTable ListToDataTable<T>(List<T> data)
  6.         {
  7.             #region 创建一个DataTable,以实体名称作为DataTable名称
  8.             var tableName = typeof(T).Name;
  9.             tableName = tableName.ToSnakeCase(); /*实体名称与表名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/
  10.             DataTable dt = new DataTable
  11.             {
  12.                 TableName = tableName
  13.             };
  14.             #endregion
  15.             #region 拿取列名,以实体的属性名作为列名      
  16.             var properties = typeof(T).GetProperties();
  17.             foreach (var item in properties)
  18.             {
  19.                 var curFileName = item.Name;
  20.                 curFileName = curFileName.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/
  21.                 dt.Columns.Add(curFileName);
  22.             }
  23.             #endregion
  24.             #region 列赋值
  25.             foreach (var item in data)
  26.             {
  27.                 DataRow dr = dt.NewRow();
  28.                 var columns = dt.Columns;
  29.                 var curPropertyList = item.GetType().GetProperties();
  30.                 foreach (var p in curPropertyList)
  31.                 {
  32.                     var name = p.Name;
  33.                     name = name.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/
  34.                     var curValue = p.GetValue(item);
  35.                     int i = columns.IndexOf(name);
  36.                     dr[i] = curValue;
  37.                 }
  38.                 dt.Rows.Add(dr);
  39.             }
  40.             #endregion  
  41.             return dt;
  42.         }
复制代码
  
2.将DataTable转换为标准的CSV文件 
  1.   /// <summary>
  2.     /// csv扩展
  3.     /// </summary>
  4.     public static class CSVEx
  5.     {
  6.         /// <summary>
  7.         ///将DataTable转换为标准的CSV文件
  8.         /// </summary>
  9.         /// <param name="table">数据表</param>
  10.         /// <param name="tmpPath">文件地址</param>
  11.         /// <returns>返回标准的CSV</returns>
  12.         public static void ToCsv(this DataTable table, string tmpPath)
  13.         {
  14.             //以半角逗号(即,)作分隔符,列为空也要表达其存在。
  15.             //列内容如存在半角逗号(即,)则用半角引号(即"")将该字段值包含起来。
  16.             //列内容如存在半角引号(即")则应替换成半角双引号("")转义,并用半角引号(即"")将该字段值包含起来。
  17.             StringBuilder sb = new StringBuilder();
  18.             DataColumn colum;
  19.             foreach (DataRow row in table.Rows)
  20.             {
  21.                 for (int i = 0; i < table.Columns.Count; i++)
  22.                 {
  23.                     Type _datatype = typeof(DateTime);
  24.                     colum = table.Columns[i];
  25.                     if (i != 0) sb.Append("\t");
  26.                     //if (colum.DataType == typeof(string) && row[colum].ToString().Contains(","))
  27.                     //{
  28.                     //    sb.Append(""" + row[colum].ToString().Replace(""", """") + """);
  29.                     //}
  30.                     if (colum.DataType == _datatype)
  31.                     {
  32.                         sb.Append(((DateTime)row[colum]).ToString("yyyy/MM/dd HH:mm:ss"));
  33.                     }
  34.                     else sb.Append(row[colum].ToString());
  35.                 }
  36.                 sb.Append("\r\n");
  37.             }
  38.             StreamWriter sw = new StreamWriter(tmpPath, false, UTF8Encoding.UTF8);
  39.             sw.Write(sb.ToString());
  40.             sw.Close();
  41.         }
  42.     }
复制代码
 
3.CSV文件导入数据到数据库
  1.     /// <summary>
  2.     /// 批量导入mysql帮助类
  3.     /// </summary>
  4.     public static class MySqlHelper
  5.     {
  6.         /// <summary>
  7.         /// MySqlBulkLoader批量导入
  8.         /// </summary>
  9.         /// <param name="_mySqlConnection">数据库连接地址</param>
  10.         /// <param name="table"></param>
  11.         /// <param name="csvName"></param>
  12.         /// <returns></returns>
  13.         public static int BulkLoad(MySqlConnection _mySqlConnection, DataTable table, string csvName)
  14.         {
  15.             var columns = table.Columns.Cast<DataColumn>().Select(colum => colum.ColumnName).ToList();
  16.             MySqlBulkLoader bulk = new MySqlBulkLoader(_mySqlConnection)
  17.             {
  18.                 FieldTerminator = "\t",
  19.                 FieldQuotationCharacter = '"',
  20.                 EscapeCharacter = '"',
  21.                 LineTerminator = "\r\n",
  22.                 FileName = csvName,
  23.                 NumberOfLinesToSkip = 0,
  24.                 TableName = table.TableName,
  25.             };
  26.             bulk.Columns.AddRange(columns);
  27.             return bulk.Load();
  28.         }
  29.     }
复制代码
  
4.使用MySqlBulkLoader批量插入数据
[code]        ///         /// 使用MySqlBulkLoader批量插入数据        ///         ///         ///         ///         ///         public int BulkLoaderData(List data)        {            if (data.Count
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

钜形不锈钢水箱

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表