Skip to content

Concepts

Here you can find detailed documentation about each one of the onETL concepts and how to use them.

Connection

Basics

onETL is used to pull and push data into other systems, and so it has a first-class Connection concept for storing credentials that are used to communicate with external systems.

A Connection is essentially a set of parameters, such as username, password, hostname.

To create a connection to a specific storage type, you must use a class that matches the storage type. The class name is the same as the storage type name (Oracle, MSSQL, SFTP, etc):

from onetl.connection import SFTP

sftp = SFTP(
 host="sftp.test.com",
 user="onetl",
 password="onetl",
)

All connection types are inherited from the parent class BaseConnection.

Class diagram

DBConnection

Classes inherited from DBConnection could be used for accessing databases.

A DBConnection could be instantiated as follows:

from onetl.connection import MSSQL

mssql = MSSQL(
 host="mssqldb.demo.com",
 user="onetl",
 password="onetl",
 database="Telecom",
 spark=spark,
)

where spark is the current SparkSession. onETL uses Spark and specific Java connectors under the hood to work with databases.

For a description of other parameters, see the documentation for the available DBConnections.

FileConnection

Classes inherited from FileConnection could be used to access files stored on the different file systems/file servers

A FileConnection could be instantiated as follows:

from onetl.connection import SFTP

sftp = SFTP(
 host="sftp.test.com",
 user="onetl",
 password="onetl",
)

For a description of other parameters, see the documentation for the available FileConnections.

FileDFConnection

Classes inherited from FileDFConnection could be used for accessing files as Spark DataFrames.

A FileDFConnection could be instantiated as follows:

from onetl.connection import SparkHDFS

spark_hdfs = SparkHDFS(
 host="namenode1.domain.com",
 cluster="mycluster",
 spark=spark,
)

where spark is the current SparkSession. onETL uses Spark and specific Java connectors under the hood to work with DataFrames.

For a description of other parameters, see the documentation for the available FileDFConnections.

Checking connection availability

Once you have created a connection, you can check the database/filesystem availability using the method check():

mssql.check()
sftp.check()
spark_hdfs.check()

It will raise an exception if database/filesystem cannot be accessed.

This method returns connection itself, so you can create connection and immediately check its availability:

mssql = MSSQL(
 host="mssqldb.demo.com",
 user="onetl",
 password="onetl",
 database="Telecom",
 spark=spark,
).check() # <--

Extract/Load data

Basics

As we said above, onETL is used to extract data from and load data into remote systems.

onETL provides several classes for this:

All of these classes have a method run() that starts extracting/loading the data:

from onetl.db import DBReader, DBWriter

reader = DBReader(
 connection=mssql,
 source="dbo.demo_table",
 columns=["column_1", "column_2"],
)

# Read data as Spark DataFrame
df = reader.run()

db_writer = DBWriter(
 connection=hive,
 target="dl_sb.demo_table",
)

# Save Spark DataFrame to Hive table
writer.run(df)

Extract data

To extract data you can use classes:

Use case Connection run() gets run() returns
DBReader Reading data from a database Any DBConnection - Spark DataFrame
FileDFReader Read data from a file or set of files Any FileDFConnection No input, or List[File path on FileSystem] Spark DataFrame
FileDownloader Download files from remote FS to local FS Any FileConnection No input, or List[File path on remote FileSystem] DownloadResult

Load data

To load data you can use classes:

Use case Connection run() gets run() returns
DBWriter Writing data from a DataFrame to a database Any DBConnection Spark DataFrame None
FileDFWriter Writing data from a DataFrame to a folder Any FileDFConnection Spark DataFrame None
FileUploader Uploading files from a local FS to remote FS Any FileConnection List[File path on local FileSystem] UploadResult

Manipulate data

To manipulate data you can use classes:

Use case Connection run() gets run() returns
FileMover Move files between directories in remote FS Any FileConnection List[File path on remote FileSystem] MoveResult

Options

Extract and load classes have a options parameter, which has a special meaning:

  • all other parameters - WHAT we extract / WHERE we load to
  • options parameter - HOW we extract/load data
db_reader = DBReader(
 # WHAT do we read:
 connection=mssql,
 source="dbo.demo_table", # some table from MSSQL
 columns=["column_1", "column_2"], # but only specific set of columns
 where="column_2 > 1000", # only rows matching the clause
 # HOW do we read:
 options=MSSQL.ReadOptions(
 numPartitions=10, # read in 10 parallel jobs
 partitionColumn="id", # balance data read by assigning each job a part of data using `hash(id) mod N` expression
 partitioningMode="hash",
 fetchsize=1000, # each job will fetch block of 1000 rows each on every read attempt
 ),
)

db_writer = DBWriter(
 # WHERE do we write to - to some table in Hive
 connection=hive,
 target="dl_sb.demo_table",
 # HOW do we write - overwrite all the data in the existing table
 options=Hive.WriteOptions(if_exists="replace_entire_table"),
)

file_downloader = FileDownloader(
 # WHAT do we download - files from some dir in SFTP
 connection=sftp,
 source_path="/source",
 filters=[Glob("*.csv")], # only CSV files
 limits=[MaxFilesCount(1000)], # 1000 files max
 # WHERE do we download to - a specific dir on local FS
 local_path="/some",
 # HOW do we download:
 options=FileDownloader.Options(
 delete_source=True, # after downloading each file remove it from source_path
 if_exists="replace_file", # replace existing files in the local_path
 ),
)

file_uploader = FileUploader(
 # WHAT do we upload - files from some local dir
 local_path="/source",
 # WHERE do we upload to- specific remote dir in HDFS
 connection=hdfs,
 target_path="/some",
 # HOW do we upload:
 options=FileUploader.Options(
 delete_local=True, # after uploading each file remove it from local_path
 if_exists="replace_file", # replace existing files in the target_path
 ),
)

file_mover = FileMover(
 # WHAT do we move - files in some remote dir in HDFS
 source_path="/source",
 connection=hdfs,
 # WHERE do we move files to
 target_path="/some", # a specific remote dir within the same HDFS connection
 # HOW do we load - replace existing files in the target_path
 options=FileMover.Options(if_exists="replace_file"),
)

file_df_reader = FileDFReader(
 # WHAT do we read - *.csv files from some dir in S3
 connection=s3,
 source_path="/source",
 file_format=CSV(),
 # HOW do we read - load files from /source/*.csv, not from /source/nested/*.csv
 options=FileDFReader.Options(recursive=False),
)

file_df_writer = FileDFWriter(
 # WHERE do we write to - as .csv files in some dir in S3
 connection=s3,
 target_path="/target",
 file_format=CSV(),
 # HOW do we write - replace all existing files in /target, if exists
 options=FileDFWriter.Options(if_exists="replace_entire_directory"),
)

More information about options could be found on DB connection and File Downloader / File Uploader / File Mover / FileDF Reader / FileDF Writer documentation

Read Strategies

onETL have several builtin strategies for reading data:

  1. Snapshot strategy (default strategy)
  2. Incremental strategy
  3. Snapshot batch strategy
  4. Incremental batch strategy

For example, an incremental strategy allows you to get only new data from the table:

from onetl.strategy import IncrementalStrategy

reader = DBReader(
 connection=mssql,
 source="dbo.demo_table",
 hwm_column="id", # detect new data based on value of "id" column
)

# first run
with IncrementalStrategy():
 df = reader.run()

sleep(3600)

# second run
with IncrementalStrategy():
 # only rows, that appeared in the source since previous run
 df = reader.run()

or get only files which were not downloaded before:

from onetl.strategy import IncrementalStrategy

file_downloader = FileDownloader(
 connection=sftp,
 source_path="/remote",
 local_path="/local",
 hwm_type="file_list", # save all downloaded files to a list, and exclude files already present in this list
)

# first run
with IncrementalStrategy():
 files = file_downloader.run()

sleep(3600)

# second run
with IncrementalStrategy():
 # only files, that appeared in the source since previous run
 files = file_downloader.run()

Most of strategies are based on HWM, Please check each strategy documentation for more details

Why just not use Connection class for extract/load?

Connections are very simple, they have only a set of some basic operations, like mkdir, remove_file, get_table_schema, and so on.

High-level operations, like: * Read Strategies support * Handling metadata push/pull * Handling different options, like if_exists="replace_file" in case of file download/upload

is moved to a separate class which calls the connection object methods to perform some complex logic.