sqoop incremental import in cloudera hadoop





In the last blog post, I described how we can import data from RDBMS to HDFS using sqoop.

Now will discuss how we can do incremental import in cloudera hadoop user interface. If you know the basic functionalities on hadoop, this is a simple task!

You need to consider ‘incremental’, ‘check-column’, and ‘last-value’ options to perform the incremental import in sqoop.

Following syntax is using for the incremental import
--incremental <mode>
--check-column <column name>
--last value <last check column value>

Cloudera hadoop is a commercial version of the hadoop. I am using Oozie workflow UI provided by the cloudera to import data.

When you are defining workflows in Oozie UI, you need to give the correct file path for the JDBC driver as well. If you didn’t include the drivers yet, please make sure you include all of those in a folder that can be accessed by everyone.

Login to the Hue UI -> Workflows -> editors -> workflows



I tried to execute below sqoop job in the xml format.


<sqoop xmlns="uri:oozie:sqoop-action:0.2">
  <job-tracker>yarnRM</job-tracker>
  <name-node>hdfs://nameservice1</name-node>
  <arg>import</arg>
  <arg>--connect</arg>
  <arg>jdbc:mysql://172.23.123.132:3306/ideabizAdmin</arg>
  <arg>--username</arg>
  <arg>hadoop</arg>
  <arg>--password</arg>
  <arg>hadoop</arg>
  <arg>--incremental</arg>
  <arg>append</arg>
  <arg>--table</arg>
  <arg>pin_charge_txn</arg>
  <arg>--check-column</arg>
  <arg>DATETIME</arg>
  <arg>--last-value</arg>
  <arg>2017-12-22 23:56:00</arg>
  <arg>--target-dir</arg>
  <arg>/user/thilina_02338/source/ideabiz_admin/pin/charge/data</arg>
  <arg>--as-textfile</arg>
  <arg>--fields-terminated-by</arg>
  <arg>"|"</arg>
  <arg>--null-string</arg>
  <arg>\0</arg>
  <arg>--null-non-string</arg>
  <arg>\0</arg>
  <arg>-m</arg>
  <arg>1</arg>
  <file>/user/oozie/share/lib/lib_20170927010640/sqoop/mysql-connector-java-5.1.38-bin.jar#mysql-connector-java-5.1.38-bin.jar</file>
  <configuration />
</sqoop>

Let’s discuss how we can use incremental update in sqoop job.

In the above job, last value will be the DATETIME column in the table. When you design the job using Oozie and schedule by coordinates, it is capable of taking the last updated value from the system time and to compare with the last DATETIME value in the internal table.

Now there should be a question in your mind, can’t we use any other variables instead of DATETIME. Yes you can. But you should have to find a way to get the last value of your external/internal table and use it with the scheduler. Hence using timestamp makes your life easier.



Now we know the way to automate the sqoop job. Let’s discuss how to define a complete workflow for the sqoop incremental import. Below is the logic I used.


1. Create a separate folder path in the HDFS to get the data from RDBMS

Eg: /user/thilina_02338/source/ideabiz_admin/pin/charge/data

Each time when you import data, it’ll remove all files in this folder. It will prevent data duplication in the external tables. In my scenario, I don’t have replication database servers in the RDBMS. Hence I break up the data in the table wise. Then schedule it separately. This helps to minimize the load on the relational database. Hence I used separate path for each table. In the first time when you define the workflow, you need to do a lot of work. But this helps you to rectify the issue quickly.

For below queries, I’m using Hive UI. It’s capable of saving the query and import to the workflow as well. Also you can draw few graphs based on the results of the query.


2. Now create external table and point the folder path as similar to above. This will get all the data in the folder path to the external table.

CREATE EXTERNAL TABLE IF NOT EXISTS ext_pin_charge_txn (ID INT, DATETIME TIMESTAMP, APIMGR_ID INT, MSISDN varchar(20), APP_REF VARCHAR(100), SERVER_REF varchar(50), DESCRIPTION varchar(500), CALLBACK_URL varchar(500), AMOUNT double, TAX int, PIN varchar(7), RETRY_COUNT int, STATUS varchar(20), PAYMENT_STATUS varchar(20), PAYMENT_REF varchar(50), RESENT_COUNT int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/user/thilina_02338/source/ideabiz_admin/pin/charge/data';


3. Create internal table which contains partition keys.

CREATE TABLE IF NOT EXISTS ideamart.pin_charge_txn (ID INT, DATETIME TIMESTAMP, APIMGR_ID INT, MSISDN varchar(20), APP_REF VARCHAR(100), SERVER_REF varchar(50), DESCRIPTION varchar(500), CALLBACK_URL varchar(500), AMOUNT double, TAX int, PIN varchar(7), RETRY_COUNT int, STATUS varchar(20), PAYMENT_STATUS varchar(20), PAYMENT_REF varchar(50), RESENT_COUNT int) PARTITIONED BY(time_key INT)


4. Insert data to the internal table from external table.

set hive.exec.dynamic.partition.mode=nonstrict;
insert into ideamart.pin_charge_txn partition(time_key)
select ID,DATETIME,APIMGR_ID,MSISDN,APP_REF,SERVER_REF,DESCRIPTION,CALLBACK_URL,AMOUNT,TAX,PIN,RETRY_COUNT,STATUS,PAYMENT_STATUS,PAYMENT_REF,RESENT_COUNT,cast(from_unixtime(unix_timestamp(cast(DATETIME as string),'yyyy-MM-dd HH:mm:ss'),'yyyyMM') as int)time_key
from ext_pin_charge_txn




The job scheduling can be done using coordinator application. Coordinator allows the user to execute recurrent and interdependent workflows.

That’s it! Yayyyyy. Scroll down and comment if you have any queries regarding incremental data import. Would love to help :)


Comments

Popular posts from this blog

Hash Functions

How you can manage throttle out errors in WSO2 API Manager

CAP THEOREM