Friday, July 10, 2020

Sending data to Amazon s3 buckets using ODI - s3 to ODI

Recently I've come across with a task to upload data from Oracle Data Integrator to amazon s3 buckets.

This demand comes for many reasons, but the main one is the trend of using cloud storage for data.

ODI is a very powerful middleware and can be used to integrate data with amazon s3. Although cards should be open, this tool is not native with amazon, and I have found a workaround to implement this.

Sending data to s3 buckets via ODI can be achieved in two main ways.


  • Using Amazon CLI commands in ODI
  • Using RESTful API to send data to from ODI to s3 bucket.  
This blog post will cover the first one because it is much faster and easier to implement. For the second one, videos within youtube can be found how to invoke RESTful API in ODI.


Firstly Amazon CLI should be installed in the desired environment where ODI is being used.
To install Amazon CLI, head over to the following link:


Having downloaded the installer, click on it and continue with the simple installation within the system.

I am using windows, so to test whether Amazon CLI has been properly installed, run this command within command prompt of windows:

  • aws --version
Testing Amazon CLI deployment

If the above text displays, it means that Amazon CLI has been successfully installed within the environment.


Now that Amazon CLI is installed, we should configure the s3 bucket connection with amazon CLI.

An existing s3 bucket is needed, for the sake of this post, I will create a new one in aws console:


Bucket name: "bucketname"
Region: eu-central-1

After creating this bucket, you should generate the AWS Access Key ID and AWS Secret Access key, to do that please watch this tutorial since I wont be covering that:


Once the keys are generated, head over to CMD to configure aws connection with Amazon CLI

To do that, firstly type the following command in cmd:

  • aws configure
And continue with keys, region inputs:

aws configure

Having configured aws connection, amazon CLI commands have to be used in order to achieve successful upload to s3 from ODI.

The following command copies testfile.txt from my environment to the bucket desired
  • aws s3 cp C:\Users\lori\Desktop\testfile.txt s3://bucketname

If you want to copy a whole folder, and create that folder within s3 bucket dynamically, use the following command:

  • aws s3 cp C:\Users\lori\Desktop\testfolder s3://bucketname/testfolder --recursive

The same can be achieve with commands to download files from s3 to an environment using ODI with amazon CLI commands, but I won't cover that in this blog post.

Now head over to ODI to configure the package for amazon CLI command invocation.


Create a package, and drag an OdiOSCommand object, this object can be used to call Operating system commands from ODI middleware:


  • On the command to execute section, copy the amazon CLI command desired.

  • Error file: use a random name, it will create a file in case of an error. **It is used for error tracing.

  • Working directory: specify the path where Amazon CLI is installed in your system.


Execute the command and head over to s3 bucket, you can observe the file uploaded in s3:


Uploaded file




As you can imagine, this can be extended to far wider application for automatic BI purposes, also ODI variables can be used.

I hope it helps!





Saturday, April 4, 2020

Waiting for specific data to proceed with action using open tools in ODI 11g

Today I will be demonstrating the basic usage of a very cool feature which ODI 11g offers through ODI Open Tools palette in a package designer.

This cool tool includes:

  • OdiWaitForData

Imagine you are asked to automate an email and notify some 3rd party that specific data has been populated in a specific table. Furthermore multiple tables, furthermore multiple filters.

This "automatic querying" can be easily achieved by using OdiWaitForData. As the name pretty much describes it, it will poll the table for specific data in specific amount of time and specific intervals, all which can be given through parameters!

For the purpose of demonstration my use case will include:

  • Check a table named PHONE_CONTACT
  • Check if the column STATE has been populated with a value like '%KOS%'
  • Poll this table every 30 seconds
  • If the condition is matched, send an email.
  • If the condition is not matched, run indefinitely until the condition is matched.

Go to the desired project, under the package section create a new package. Proceed to diagram tab inside the package select 2 of elements which I will be using for today's use case.

WaitForData

The oracle table which I will poll is named PHONE_CONTACT, and I will check for a record consisting the string '%KOS%' in the column STATE of this table.

PHONE_CONTACT Oracle table used for polling

Let's dig in into OdiWaitForData parameters, click on the open tool element and start filling the parameters:

OdiWaitForData parameters
- Context - provide the context (I hid it due to privacy reasons)
- Global Row Count - The amount of rows to meet the criteria of consisting STATE like '%KOS%'
- Polling Interval (milliseconds) - check the table every 30 seconds
- SQL Filter - STATE LIKE '%KOS%'
- Table Name - PHONE_CONTACT
- Timeout - if 0, the polling will continue indefinitely until the criteria is matched

**If multiple table's need to be checked at once, or say for example the row count has to be compared with a specific variable, check this documentation for a detailed usage of ODI's open tools:

Now let's fill in OdiSendMail's parameters:


Now let's execute the package and see how it's working:


Inserting this row, the package should not stop and continue to poll the table because the state does not meet the criteria.

30 Seconds have passed and the package is still running, so it should be working good:


Now let's insert a row which matches the criteria to proceed with email:


Inserting this record and waiting for 30 seconds, an email should arrive, notifying the destination that the table has met the conditions needed!



Saturday, March 21, 2020

Creating a Custom Integration Knowledge Module to Backup Table before inserting new rows - ODI11g

Today I will be modifying an existing IKM SQL Control Append and implement + test it in ODI11g.


*This case study was demonstrated via ODI Substitution API in oracle's website, for further references direct to this link:*



To implement this, I will duplicate the original IKM SQL Control Append Knowledge Module and add additional steps.

2 Additional steps will be added to the Knowledge Module:


  • Drop Backup Table (if it already exists one);
  • Create Backup Table of the target's table before inserting new rows, tables will be suffixed with 'Target table's name'_BKP.

I will name this IKM as 'IKM SQL CONTROL APPEND/ wCreateBKP':

No need to specify target technology.


In Details tab, each step of the knowledge module is specified, and the Order is ranked in a top down approach, so the upper most step starts the first, and so on.

Typically the additional step for Dropping an existing BKP_ table with the target's table name should be the first step in this knowledge module, additionally after this step we want to Create a BKP_ prefixed table with all the existing records in the target table.

So the dependency here is that both of the steps should execute before Truncate target table or Delete target table step. Hence I will add both of these steps as the first to execute in the KM.


The typical SQL query for dropping a table in Oracle is:

  • Drop table 'Table_name'_BKP;

And the typical SQL query for creating a table postfixed with '_BKP' as a copy of another table including rows is:

  • Create table 'Table_name'||'_BKP' as Select * from 'Table_name';


ODI KM's use substitution API's. As cited in oracle's website (link above): KMs are written as templates by using the Oracle Data Integrator substitution API. The API methods are java methods that return a string value. They all belong to a single object instance named "odiRef". The same method may return different values depending on the type of KM that invokes it. That's why they are classified by type of KM.


Query:

  • Drop table 'Table_name'_BKP;
  • Drop table <%=odiRef.getTable("L","TARG_NAME","A")%>_BKP
And,

  • Create table 'Table_name'||'_BKP' as Select * from 'Table_name';
  • Create table <%=odiRef.getTable("L","TARG_NAME","A")%>_BKP as select * from <%=odiRef.getTable("L","TARG_NAME","A")%>

Now that we have the substitution API's ready, create two additional steps in Details tab of KM's using the green plus '+' on the top right corner, and rearrange the order using the up and down arrows on the top right corner:

New steps in KM

Note that when Dropping the backup table (if existing) the checkbox for ignore errors is marked, meaning that if the _BKP backup table doesn't already exist, do not throw an error and proceed with Creating it.

Open the step for Drop Backup Table and make sure to check that box and paste the substitution API's code under Command on Target section. No need to specify Technology of Context whatsoever.

Drop Backup Table (if existing)

Navigate to Create Backup Table step, this time I won't check Ignore Errors checkbox because I wan't to throw an error whenever a table cannot be created, make sure to paste the code under Command on Target Section.


Create Backup Table


Now that both of these steps are created, I want to create two Options under this Knowledge module, so if I want to disable this feature of the KM, I can do that by selecting it as False.

Navigate to Knowledge Modules in Designer Tab, Expand Knowledge Modules, expand IKM SQL CONTROL APPEND/ wCreateBKP and right click on it to add a new option:

New Option



When Creating the options, check for Position, the Options Position should refer to the Order position of the step inside Details tab, in my case Dropping the Backup table should have position 10 and Creating it should have position 20.

Edit Option

Edit Option

Now that we have the Knowledge module in place, it is time to test it.

To demonstrate it's execution and step I will use a simple interface which uses a file to populate an Oracle table.

Under flow tab of this interface, I will choose the custom IKM we've built, and I will make sure that DROP_BACKUP_TBL option and CREATE_BACKUP_TBL options are selected as true (which it should be by default) and Truncate as True.

Flow

It is time to execute the interface and check the steps.

Execution steps
As you can observe, Drop Backup Table has a warning because a previous table with _BKP name did not exist, and then Create Backup Table executed correctly.

If I run it again, it won't show any warning on Drop Backup Table because now a _BKP table exists:

Execution steps





Wednesday, March 4, 2020

How to perform Group By in ODI11g interfaces

Aggregation is a must in Data Warehousing and Data Integration, as of ODI 12c there exists a designated tool for Group by operations, but in ODI 11g you have to perform a workaround.

Aggregate functions in SQL include:

MIN
MAX
SUM
COUNT
AVG

To perform a query with the above functions, a group by statement has to be used in order to aggregate data.


I will be using a random source file created randomly with ID and different numerical attributes:


Source file set


And a target table created manually in Oracle with identical attributes such as source file:

Target table


Now I will create an interface (GROUP_BY_EXP) which uses the data store's created from above file and table:

Initial Interface


Assuming that I want to find the SUM of Height grouped by ID only, typically this will be achieved with this query:


Mapping only ID and Height in Mappings expressions this is pretty easy:
Mapping Expressions


Executing this interface will generate this code, which indeed is exactly what we expect it to generate:
Code generated in first run

What if we map all of the columns on the target table? And we need the same task Sum(Height) and Group by ID? 







Let's firstly map all of the columns:
Mapping Expressions
Let's execute it:

Code generated in second run

Now this brings an issue because in our case we need only ID used in Group By expression.

In order to avoid this, each of unneeded columns in group by statement: NAME, LENGTH, WIDTH should have aggregate function used in expression, if that does not pose a problem in whatever you are using, then encapsulate each column with MIN/MAX:


Mapping Expressions
Let's execute it:

Code generated in third run
As you can see NAME, HEIGHT and WIDTH are removed from Group By statement whenever there's an aggregate function used on them.

Note: This is a workaround, data should be analyzed after the execution due to the fact that MIN/MAX functions might generate data which can cause trouble on your business logic.






Tuesday, February 18, 2020

ODI 11g - Updating existing records - Inserting new ones (based upon a key)

Today's post will be a simple but very useful business case in Data Integration and Data warehousing purposes.

- Populate a table will new records
- If the record is fully duplicate, reject it.
- If the record is a duplicate on a key, but there is a particular field changing, update that record

In Oracle Data Integrator 11g this is easily achieved through an Integration Knowledge Module known as IKM Oracle Incremental Update (MERGE) which comes built in with ODI11g.

To use it, there must be a key declared either in database side, or within ODI.

The example which I will cover will include declaring a UK within ODI.

-  Firstly make sure that source datastore and target datastores are created in ODI.







I reverse engineered this table inside ODI:








Create a new Interface and define source and target:






As mentioned above, a key has to be declared, in my case, an Employee is uniquely defined with EMPID, so EMPID is the column which will be used to determine whether a record exists or not!


Navigate to Quick-Edit tab and define EMPID as UK:




Now we are almost set, but we also need to define the knowledge module in the Flow tab:



Click on top of SrcSet (In my case it is a file so an LKM is needed to load it into an Oracle RDBMS table) - Select LKM File to SQL


Click on top of Target - Select IKM Oracle Incremental Update (Merge) specifically.


Make sure that you disable Flow Control (False) because CKM is not being used and it will throw an error otherwise:





The target table is currently empty, in the first run every record in the source set will be inserted, run it and insert every record if there is any:










Expanding Merge rows step, it can be seen that all of the 7 records have been inserted:




























Saturday, November 9, 2019

Automating email reports using ODI Packages - Accessing Data Sources with changing name mask using ODI Variables

A very important feature which ODI offers is ODI Packages. ODI offers many elements which can be used to automate different processes.

Today I will try to demonstrate the usage of ODI Packages, scheduling runs for those packages and using variables to dynamically access different data sources every day.

In the end, different statistics and automatic emails can be generated to achieve basically everything which can be achieved through an SQL Query in a database :).

Scenario:

- Read a .TXT file which name changes everyday with current date. (test20190811.txt)
- Populate an oracle table everyday except for Saturday and Sunday
- Generate a .csv file from that table with name and date mask (test20190811.csv)
- Attach that file to an email and send that email everyday to a recipient.
- Show row number of the populated table in email.



Firstly we need to create a variable which obtains the value of current date.
That can be done through a simple query in Oracle:

SELECT to_char(sysdate,'YYYYMMDD') from dual;

The variable will be named VAR1, and the Schema provided can be any schema since I am not selecting a particular table but I am selecting from dual which is a built in oracle table. (Otherwise if you need to select from a specific table, the schema where that table is created has to be provided).




Make sure to test the value of the variable using the refresh logo on top right corner, after clicking it, navigate to history tab on the left top corner and check the value if it corresponds to today's date and desired format (in my case YYYYMMDD).


Now that we have created the variable which will do the work, we have to create a new interface which will read the file and populate an non-existing oracle table which will be created during the first run of the interface.

As mentioned, I will be reading a source file .txt named test20190811 (date is the file mask which will change everyday according to current date). Create a new data store which will be used as a source in the interface, make sure that at Resource Name, give the static part of the name, and then for file mask address the variable created, variables in ODI are addressed using "#", in this case #VAR1.

Make sure to write this part manually and don't use the search icon on the right:



After that, define file format in Files tab and make sure to add Columns manually using + sign in Columns tab. (It is not possible to reverse engineer the file because RKM will not obtain the variable value as of 11g.)

After defining manually all of the columns, create a new interface and add the created data store as source, right click on it and Add to target. All of existing columns from source will be mapped into target. 



(Note that the table can be created in database manually and reverse engineered as a new data store and dragged into target tab).

In my case LKM will be assigned LKM File to SQL, IKM SQL Control Append.

Run the interface and check the operator:


Now that we have the interface which loads (File to table) we have to generate a new .csv file containing data from this table and send it as an email attachment and provide statistics within email for every load.

In one of my previous posts, I have shown how to generate a file from an Oracle table, I will generate a .xlsx using ODI package elements this time which is quite interesting.

Let's assemble our elements in a package.

Create a new package, I will name it EMAIL_AUTOMATION.

The first element that I will drag is VAR1 variable, we have to refresh that variable everyday (rememberthat variables have to be refreshed, otherwise they will keep the old value), due to this fact we need to load a new file which updates is name mask everyday so a refreshing is a must:

Drag VAR1 into the diagram, it will be set as a Refresh Variable, which is exactly what we need.



After having refreshed the variable, interface has to be run, we assume that the file will be generated in the path where Interface reads the source file (If it doesn't, we can use ODI File Copy from Toolbox provided and specify the copy path before running the interface but I won't be covering that topic). Drag the interface into the diagram. Connect both points using the Ok arrow which indicates that if one step executes correctly continue with the other step:



After this step file generation is required, I will do that not using an interface but using ODI SQL Unload Element from toolbox,

-Target File (full path of the file which will be generated (I want to name it file#VAR1.csv))
-JDBC Driver: oracle.jdbc.OracleDriver
-JDBC URL: jdbc:oracle:thin@<>:<>;<> (provide database info)
-User: Provide User
-Password: Provide Password
-File Format: Delimited
-SQL QUERY: (select * from <MYSCHEMA>.TARGETTEST), this is the query which "decides" what to output in your file.

Right click on ODI Sql Unload step and select execute step to test it.



Go to the path specified to generate the file and open it:



Having done this correctly you can see the data from the table.


Let's create a new variable which counts the rows in the table, using this query:

SELECT COUNT(*) FROM <MYSCHEMA>.TARGETTEST

Drag this variable into package and make sure to refresh it before the interface, otherwise the row count will not be updated:



As you can see another variable refresh has been added and also an ODISendMail step from Internet folder inside the toolbox on the left, now we need to fill the ODISendMail parameters as below:



- Mail Server (ip address of the mail server)
- From: Any valid email
- To: Any valid email
- Attachment: Full path of the file generated + variable which obtains the dynamically changing mask
- Message Body which consists the date of load, table row count as below:





Save the package and run it (check operator with all of the steps finished):



Wait for the email:




Now lets add a schedule to this package, to do so we must generate a package scenario and also have an existing ODI Agent running.

Generate a scenario under package (Scenario is a frozen code)



Expand the new scenario generated, right click on Scheduling and select new scheduling:





Many constraints can be added, I want it to run every day of the week 7:28 AM.