extract-n-csv-files-and-load-to-table

استخراج n فایل CSV متفاوت و بارگذاری داده های آنها در جدول مقصد

ما همیشه برای بارگذاری فایل CSV، Data Flow Task را می آوریم و flat file را به عنوان سورس در نظر می‌گیریم و آن داده‌ها را در جدول مقصد (جدول Stage) بارگذاری می‌کنیم. اگر دو فایل داشته باشیم، اشکالی ندارد، می‌توانیم دو DFT را ایجاد کنیم و داده‌های آنها را در جدول بارگذاری کنیم.

فرض کنید ۱۰۰ فایل مختلف داریم، در این صورت چه کاری می‌توانیم انجام دهیم؟

فکر می کنید باید DFT را به ازای هر فایل ایجاد کنیم و نگاشت را انجام دهیم و آن داده‌ها را در پایگاه داده بارگذاری کنیم؟ این یک کار بسیار خسته‌کننده است. فرض کنید اگر مجبور باشیم این DFT های زیاد را دوباره ایجاد کنیم، اگر فایلی اضافه یا حذف شود، باید پکیج را به‌روزرسانی کنیم.

اگر چنین سناریویی دارید، نگران نباشید. در اینجا خواهیم دید که چگونه n نوع فایل مختلف را به راحتی بارگذاری کنیم.

ما n تعداد فایل مانند شکل زیر داریم. هر فایل دارای یک نام جدول است.

همه فایل‌ها ساختارهای متفاوتی دارند. در اینجا فرض می‌کنیم که فایل‌ها header ندارند. در مرحله بعد، بررسی خواهیم کرد که آیا این فایل‌ها header دارند یا خیر، سپس چگونه این فایل‌ها را بارگذاری خواهیم کرد.

ما باید متناظرا یکسری جدول staging هم برای ذخیره رکوردهای هر فایل ایجاد کنیم.

بیایید ببینیم چگونه آن را بارگذاری می‌کنیم.

برای بارگذاری این فایل‌ها، ابتدا باید کارهای زیر را انجام دهیم.

۱. نام جدول و نام فایل باید یکسان باشند. (اگر یکسان نباشند، در این صورت دوباره باید جدول پیکربندی را ایجاد کنیم تا نام فایل را با نام جدول نگاشت کنیم). در اینجا نام فایل‌ها و نام جداول هر دو یکسان هستند.

۲. ما جدول پیکربندی داریم که در آن لیست نام ستون‌ها را با جداکننده ذخیره می‌کنیم. در فایل‌ها، جداکننده Pipe (|) داریم. بنابراین باید ستون‌ها را با جداکننده pipe ذخیره کنیم.

۳. اگر فایلی اضافه می‌کنیم، باید لیست ستون‌های آن فایل‌ها را نیز اضافه کنیم.

۴. اگر در این مورد تغییری در ساختار فایل‌ها ایجاد می‌کنیم، باید لیست ستون‌ها را در جدول پیکربندی به‌روزرسانی کنیم.

۵. در اینجا برای همه جداول، نوع داده یکسانی داریم.

حالا ما در حال ایجاد پکیج هستیم.

از Foreach Loop Container برای شمارش فایل‌ها استفاده می‌شود.

ما چند متغیر داریم که در این پکیج استفاده می‌شوند.

این Execute SQL Task با نام SQL_Truncate_Table برای پاک کردن جدول staging استفاده می‌شود. ما ورودی را به عنوان file_name می‌گیریم که نام فایل را با مسیر کامل دارد. در مسیر کامل، نام فایل را استخراج می‌کنیم و به صورت پویا دستور truncate را ایجاد می‌کنیم و این کد زیر را در statement می نویسیم.

DECLARE @full_path VARCHAR(500)=?,
        @table_nm  VARCHAR(500),
        @sql_str   NVARCHAR(500);

SET @table_nm = Replace(RIGHT(@full_path, Charindex(‘\’, Reverse(@full_path))
                                          – 1), ‘.csv’, ”);
SET @sql_str =’Truncate table ‘ + @table_nm;

EXEC (@sql_str); 

متغیرها را به این صورت به Script Task پاس می دهیم:

و بعد درون متد اصلی Script Task کد زیر را می نوسیم:

public void Main()

                                {

          //Reading the file information

            string File_Path = Dts.Variables[“User::File_Name”].Value.ToString();

            FileInfo fi = new FileInfo(File_Path);

            string FileName = fi.Name;

            string table_Name= FileName.Replace(“.csv”, “”);

            char FileDelimiter = ‘|’;

            string targetPath = Dts.Variables[“User::Target_Path”].Value.ToString(); 

            //Reading the csv file and storing that data into the datatable

            try 

            {          

               //Reading the list of columns from the database creating the datatable and adding the list of columns in this datatable.

                SqlConnection con = new SqlConnection(“Data Source = BAGESH\\BAGESHDB; Initial Catalog =AdventureWorksDW2017_Stage; Integrated Security=true;”);

                string sql = “select Table_Name,Columns_name from tbl_columns_config  where Table_name = ‘” + table_Name +”‘”;

                con.Open();

                SqlCommand cmd = new SqlCommand(sql, con);

                cmd.CommandType = CommandType.Text; 

                SqlDataReader rd = cmd.ExecuteReader();

                string Columns = null;

                while (rd.Read())

                {               

                    Columns = rd.GetString(1);                  

                } 

                DataTable dt = new DataTable();

                string[] Columns_List = Columns.Split(FileDelimiter);

                foreach (string column in Columns_List) 

                {

                    dt.Columns.Add(column); 

                } 

                //Reading the CSV file and adding the data into the datatable. 

                using (StreamReader reader = new StreamReader(File_Path))

                {    

                    while (!reader.EndOfStream) 

                    {

//if we have header in this file we need to scape that header line here. 

                        string[] rows = reader.ReadLine().Split(FileDelimiter);

                        DataRow dr = dt.NewRow();

                        for (int i = 0; i < Columns_List.Length; i++)

                        {

                            dr[i] = rows[i];

                        }                       

                        dt.Rows.Add(dr);                       

                    } 

                } 

                con.Close();

                // Creating Sql bulk copy command and mapping the table columns and data table columns and inserting the records in the table.

                SqlBulkCopy obj = new SqlBulkCopy(con); 

                con.Open(); 

                //assign Destination table name

                obj.DestinationTableName = table_Name; 

                foreach (string column in Columns_List)

                {

                    obj.ColumnMappings.Add(column, column);

                }             

                   obj.WriteToServer(dt);               

                con.Close(); 

                //After loading the file we are moving the files to archive dir

                string destFile = Path.Combine(targetPath, FileName); 

                File.Move(File_Path, destFile);

            }

//if any error occurs

            catch (Exception ex)

            {

                Dts.Events.FireError(0, “Fire Error”, “An error occurred: ” + ex.Message.ToString(), “”, 0);

            } 

            Dts.TaskResult = (int)ScriptResults.Success;

                                }

پکیج را اجرا کنید. رکوردها را در تمام جداول مشاهده کنید. فولدر فایلهای سورس و فولدر فایلهای مقصد را هم ببینید.

داده ها بطورموفقیت آمیزی بارگذاری شدند.

اگر حالا فایلهای دیگری (مثلا دوتا فایل) هم اضافه کنیم. دوتا جدول معادل هم باید در دیتابیس ایجاد کنیم.

حالا باید این ستون‌های جدول را در جدول پیکربندی اضافه کنیم.

اگر دوباره پکیج را اجرا کنیم با موفقیت انجام خواهد شد. رکوردها را در جدول ببنیید:

بیایید چند فایل را تغییر دهیم (ستون‌هایی به فایل‌ها اضافه کنیم).

ابتدا باید ستون را به جدول و جدول پیکربندی اضافه کنیم.

اضافه کردن دو ستون

City, Sal

حالا جدول پیکربندی را به‌روزرسانی می‌کنیم.

فایل را ببینید:

حال پکیج را اجرا کنید. و بعد رکورد را در جدول مشاهده کنید.

برچسب ها: بدون برچسب

نظر بگذارید

آدرس ایمیل شما منتشر نخواهد شد. قسمتهای مورد نیاز علامت گذاری شده اند *