sqoop incremental import in cloudera hadoop
In the last blog post, I described how we can
import data from RDBMS to HDFS using sqoop.
Now will discuss how we can do incremental
import in cloudera hadoop user interface. If you know the basic functionalities
on hadoop, this is a simple task!
You need to consider ‘incremental’,
‘check-column’, and ‘last-value’ options to perform the incremental import in
sqoop.
Following syntax is using for the incremental
import
--incremental <mode>
--check-column <column name>
--last value <last check column value>
--check-column <column name>
--last value <last check column value>
Cloudera hadoop is a commercial version of the
hadoop. I am using Oozie workflow UI provided by the cloudera to import data.
When you are defining workflows in Oozie UI, you
need to give the correct file path for the JDBC driver as well. If you didn’t
include the drivers yet, please make sure you include all of those in a folder
that can be accessed by everyone.
Login to the Hue UI -> Workflows ->
editors -> workflows
I tried to execute below sqoop job in the xml
format.
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>yarnRM</job-tracker>
<name-node>hdfs://nameservice1</name-node>
<arg>import</arg>
<arg>--connect</arg>
<arg>jdbc:mysql://172.23.123.132:3306/ideabizAdmin</arg>
<arg>--username</arg>
<arg>hadoop</arg>
<arg>--password</arg>
<arg>hadoop</arg>
<arg>--incremental</arg>
<arg>append</arg>
<arg>--table</arg>
<arg>pin_charge_txn</arg>
<arg>--check-column</arg>
<arg>DATETIME</arg>
<arg>--last-value</arg>
<arg>2017-12-22
23:56:00</arg>
<arg>--target-dir</arg>
<arg>/user/thilina_02338/source/ideabiz_admin/pin/charge/data</arg>
<arg>--as-textfile</arg>
<arg>--fields-terminated-by</arg>
<arg>"|"</arg>
<arg>--null-string</arg>
<arg>\0</arg>
<arg>--null-non-string</arg>
<arg>\0</arg>
<arg>-m</arg>
<arg>1</arg>
<file>/user/oozie/share/lib/lib_20170927010640/sqoop/mysql-connector-java-5.1.38-bin.jar#mysql-connector-java-5.1.38-bin.jar</file>
<configuration />
</sqoop>
Let’s discuss how we can use incremental update
in sqoop job.
In the above job, last value will be the
DATETIME column in the table. When you design the job using Oozie and schedule
by coordinates, it is capable of taking the last updated value from the system
time and to compare with the last DATETIME value in the internal table.
Now there should be a question in your mind,
can’t we use any other variables instead of DATETIME. Yes you can. But you
should have to find a way to get the last value of your external/internal table
and use it with the scheduler. Hence using timestamp makes your life easier.
Now we know the way to automate the sqoop job.
Let’s discuss how to define a complete workflow for the sqoop incremental
import. Below is the logic I used.
1. Create a separate folder path in the HDFS to
get the data from RDBMS
Eg:
/user/thilina_02338/source/ideabiz_admin/pin/charge/data
Each time when you import data, it’ll remove all
files in this folder. It will prevent data duplication in the external tables.
In my scenario, I don’t have replication database servers in the RDBMS. Hence I
break up the data in the table wise. Then schedule it separately. This helps to
minimize the load on the relational database. Hence I used separate path for
each table. In the first time when you define the workflow, you need to do a
lot of work. But this helps you to rectify the issue quickly.
For below queries, I’m using Hive UI. It’s
capable of saving the query and import to the workflow as well. Also you can
draw few graphs based on the results of the query.
2. Now create external table and point the
folder path as similar to above. This will get all the data in the folder path
to the external table.
CREATE EXTERNAL TABLE IF
NOT EXISTS ext_pin_charge_txn (ID INT, DATETIME TIMESTAMP, APIMGR_ID INT,
MSISDN varchar(20), APP_REF VARCHAR(100), SERVER_REF varchar(50), DESCRIPTION
varchar(500), CALLBACK_URL varchar(500), AMOUNT double, TAX int, PIN varchar(7),
RETRY_COUNT int, STATUS varchar(20), PAYMENT_STATUS varchar(20), PAYMENT_REF
varchar(50), RESENT_COUNT int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
STORED AS TEXTFILE LOCATION
'/user/thilina_02338/source/ideabiz_admin/pin/charge/data';
3. Create internal table which contains
partition keys.
CREATE TABLE IF NOT EXISTS ideamart.pin_charge_txn (ID INT, DATETIME TIMESTAMP, APIMGR_ID INT, MSISDN varchar(20), APP_REF VARCHAR(100), SERVER_REF varchar(50), DESCRIPTION varchar(500), CALLBACK_URL varchar(500), AMOUNT double, TAX int, PIN varchar(7), RETRY_COUNT int, STATUS varchar(20), PAYMENT_STATUS varchar(20), PAYMENT_REF varchar(50), RESENT_COUNT int) PARTITIONED BY(time_key INT)
4. Insert data to the internal table from
external table.
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition.mode=nonstrict;
insert into ideamart.pin_charge_txn
partition(time_key)
select ID,DATETIME,APIMGR_ID,MSISDN,APP_REF,SERVER_REF,DESCRIPTION,CALLBACK_URL,AMOUNT,TAX,PIN,RETRY_COUNT,STATUS,PAYMENT_STATUS,PAYMENT_REF,RESENT_COUNT,cast(from_unixtime(unix_timestamp(cast(DATETIME
as string),'yyyy-MM-dd HH:mm:ss'),'yyyyMM') as int)time_key
from ext_pin_charge_txn
The job scheduling can be done using coordinator
application. Coordinator allows the user to execute recurrent and
interdependent workflows.
That’s it! Yayyyyy. Scroll down and comment if
you have any queries regarding incremental data import. Would love to help :)
Comments
Post a Comment