ما همیشه برای بارگذاری فایل CSV، Data Flow Task را می آوریم و flat file را به عنوان سورس در نظر میگیریم و آن دادهها را در جدول مقصد (جدول Stage) بارگذاری میکنیم. اگر دو فایل داشته باشیم، اشکالی ندارد، میتوانیم دو DFT را ایجاد کنیم و دادههای آنها را در جدول بارگذاری کنیم.
فرض کنید ۱۰۰ فایل مختلف داریم، در این صورت چه کاری میتوانیم انجام دهیم؟
فکر می کنید باید DFT را به ازای هر فایل ایجاد کنیم و نگاشت را انجام دهیم و آن دادهها را در پایگاه داده بارگذاری کنیم؟ این یک کار بسیار خستهکننده است. فرض کنید اگر مجبور باشیم این DFT های زیاد را دوباره ایجاد کنیم، اگر فایلی اضافه یا حذف شود، باید پکیج را بهروزرسانی کنیم.
اگر چنین سناریویی دارید، نگران نباشید. در اینجا خواهیم دید که چگونه n نوع فایل مختلف را به راحتی بارگذاری کنیم.
ما n تعداد فایل مانند شکل زیر داریم. هر فایل دارای یک نام جدول است.

همه فایلها ساختارهای متفاوتی دارند. در اینجا فرض میکنیم که فایلها header ندارند. در مرحله بعد، بررسی خواهیم کرد که آیا این فایلها header دارند یا خیر، سپس چگونه این فایلها را بارگذاری خواهیم کرد.

ما باید متناظرا یکسری جدول staging هم برای ذخیره رکوردهای هر فایل ایجاد کنیم.

بیایید ببینیم چگونه آن را بارگذاری میکنیم.
برای بارگذاری این فایلها، ابتدا باید کارهای زیر را انجام دهیم.
۱. نام جدول و نام فایل باید یکسان باشند. (اگر یکسان نباشند، در این صورت دوباره باید جدول پیکربندی را ایجاد کنیم تا نام فایل را با نام جدول نگاشت کنیم). در اینجا نام فایلها و نام جداول هر دو یکسان هستند.
۲. ما جدول پیکربندی داریم که در آن لیست نام ستونها را با جداکننده ذخیره میکنیم. در فایلها، جداکننده Pipe (|) داریم. بنابراین باید ستونها را با جداکننده pipe ذخیره کنیم.

۳. اگر فایلی اضافه میکنیم، باید لیست ستونهای آن فایلها را نیز اضافه کنیم.
۴. اگر در این مورد تغییری در ساختار فایلها ایجاد میکنیم، باید لیست ستونها را در جدول پیکربندی بهروزرسانی کنیم.
۵. در اینجا برای همه جداول، نوع داده یکسانی داریم.
حالا ما در حال ایجاد پکیج هستیم.

از Foreach Loop Container برای شمارش فایلها استفاده میشود.
ما چند متغیر داریم که در این پکیج استفاده میشوند.

این Execute SQL Task با نام SQL_Truncate_Table برای پاک کردن جدول staging استفاده میشود. ما ورودی را به عنوان file_name میگیریم که نام فایل را با مسیر کامل دارد. در مسیر کامل، نام فایل را استخراج میکنیم و به صورت پویا دستور truncate را ایجاد میکنیم و این کد زیر را در statement می نویسیم.
DECLARE @full_path VARCHAR(500)=?, SET @table_nm = Replace(RIGHT(@full_path, Charindex(‘\’, Reverse(@full_path)) EXEC (@sql_str); |
متغیرها را به این صورت به Script Task پاس می دهیم:

و بعد درون متد اصلی Script Task کد زیر را می نوسیم:
public void Main()
{
//Reading the file information
string File_Path = Dts.Variables[“User::File_Name”].Value.ToString();
FileInfo fi = new FileInfo(File_Path);
string FileName = fi.Name;
string table_Name= FileName.Replace(“.csv”, “”);
char FileDelimiter = ‘|’;
string targetPath = Dts.Variables[“User::Target_Path”].Value.ToString();
//Reading the csv file and storing that data into the datatable
try
{
//Reading the list of columns from the database creating the datatable and adding the list of columns in this datatable.
SqlConnection con = new SqlConnection(“Data Source = BAGESH\\BAGESHDB; Initial Catalog =AdventureWorksDW2017_Stage; Integrated Security=true;”);
string sql = “select Table_Name,Columns_name from tbl_columns_config where Table_name = ‘” + table_Name +”‘”;
con.Open();
SqlCommand cmd = new SqlCommand(sql, con);
cmd.CommandType = CommandType.Text;
SqlDataReader rd = cmd.ExecuteReader();
string Columns = null;
while (rd.Read())
{
Columns = rd.GetString(1);
}
DataTable dt = new DataTable();
string[] Columns_List = Columns.Split(FileDelimiter);
foreach (string column in Columns_List)
{
dt.Columns.Add(column);
}
//Reading the CSV file and adding the data into the datatable.
using (StreamReader reader = new StreamReader(File_Path))
{
while (!reader.EndOfStream)
{
//if we have header in this file we need to scape that header line here.
string[] rows = reader.ReadLine().Split(FileDelimiter);
DataRow dr = dt.NewRow();
for (int i = 0; i < Columns_List.Length; i++)
{
dr[i] = rows[i];
}
dt.Rows.Add(dr);
}
}
con.Close();
// Creating Sql bulk copy command and mapping the table columns and data table columns and inserting the records in the table.
SqlBulkCopy obj = new SqlBulkCopy(con);
con.Open();
//assign Destination table name
obj.DestinationTableName = table_Name;
foreach (string column in Columns_List)
{
obj.ColumnMappings.Add(column, column);
}
obj.WriteToServer(dt);
con.Close();
//After loading the file we are moving the files to archive dir
string destFile = Path.Combine(targetPath, FileName);
File.Move(File_Path, destFile);
}
//if any error occurs
catch (Exception ex)
{
Dts.Events.FireError(0, “Fire Error”, “An error occurred: ” + ex.Message.ToString(), “”, 0);
}
Dts.TaskResult = (int)ScriptResults.Success;
}
پکیج را اجرا کنید. رکوردها را در تمام جداول مشاهده کنید. فولدر فایلهای سورس و فولدر فایلهای مقصد را هم ببینید.




داده ها بطورموفقیت آمیزی بارگذاری شدند.
اگر حالا فایلهای دیگری (مثلا دوتا فایل) هم اضافه کنیم. دوتا جدول معادل هم باید در دیتابیس ایجاد کنیم.
حالا باید این ستونهای جدول را در جدول پیکربندی اضافه کنیم.



اگر دوباره پکیج را اجرا کنیم با موفقیت انجام خواهد شد. رکوردها را در جدول ببنیید:

بیایید چند فایل را تغییر دهیم (ستونهایی به فایلها اضافه کنیم).
ابتدا باید ستون را به جدول و جدول پیکربندی اضافه کنیم.
اضافه کردن دو ستون
City, Sal

حالا جدول پیکربندی را بهروزرسانی میکنیم.

فایل را ببینید:

حال پکیج را اجرا کنید. و بعد رکورد را در جدول مشاهده کنید.



نظر بگذارید