0.13.0 (2025-02-24)
🎉 3 years since first release 0.1.0 🎉
Breaking Changes
- Add Python 3.13. support. (#298)
- Change the logic of
FileConnection.walk
andFileConnection.list_dir
. (#327)
Previously limits.stops_at(path) == True
considered as “return current file and stop”, and could lead to exceeding some limit.
Not it means “stop immediately”.
- Change default value for FileDFWriter.Options(if_exists=...)
from error
to append
,
to make it consistent with other .Options()
classes within onETL. (#343)
Features
- Add support for
FileModifiedTimeHWM
HWM class (see etl-entities 2.5.0):from etl_entitites.hwm import FileModifiedTimeHWM from onetl.file import FileDownloader from onetl.strategy import IncrementalStrategy downloader = FileDownloader( ..., hwm=FileModifiedTimeHWM(name="somename"), ) with IncrementalStrategy(): downloader.run()
- Introduce
FileSizeRange(min=..., max=...)
filter class. (#325)
Now users can set FileDownloader
/ FileMover
to download/move only files with specific file size range:
from onetl.file import FileDownloader
from onetl.file.filter import FileSizeRange
downloader = FileDownloader(
...,
filters=[FileSizeRange(min="10KiB", max="1GiB")],
)
TotalFilesSize(...)
limit class. (#326)
Now users can set FileDownloader
/ FileMover
to stop downloading/moving files after reaching a certain amount of data:
from datetime import datetime, timedelta
from onetl.file import FileDownloader
from onetl.file.limit import TotalFilesSize
downloader = FileDownloader(
...,
limits=[TotalFilesSize("1GiB")],
)
FileModifiedTime(since=..., until=...)
file filter. (#330)
Now users can set FileDownloader
/ FileMover
to download/move only files with specific file modification time:
from datetime import datetime, timedelta
from onetl.file import FileDownloader
from onetl.file.filter import FileModifiedTime
downloader = FileDownloader(
...,
filters=[FileModifiedTime(before=datetime.now() - timedelta(hours=1))],
)
SparkS3.get_exclude_packages()
and Kafka.get_exclude_packages()
methods. (#341)
Using them allows to skip downloading dependencies not required by this specific connector, or which are already a part of Spark/PySpark:
from onetl.connection import SparkS3, Kafka
maven_packages = [
*SparkS3.get_packages(spark_version="3.5.4"),
*Kafka.get_packages(spark_version="3.5.4"),
]
exclude_packages = SparkS3.get_exclude_packages() + Kafka.get_exclude_packages()
spark = (
SparkSession.builder.appName("spark_app_onetl_demo")
.config("spark.jars.packages", ",".join(maven_packages))
.config("spark.jars.excludes", ",".join(exclude_packages))
.getOrCreate()
)
Improvements
- All DB connections opened by
JDBC.fetch(...)
,JDBC.execute(...)
orJDBC.check()
are immediately closed after the statements is executed. (#334)
Previously Spark session with master=local[3]
actually opened up to 5 connections to target DB - one for JDBC.check()
,
another for Spark driver interaction with DB to create tables, and one for each Spark executor. Now only max 4 connections are opened,
as JDBC.check()
does not hold opened connection.
This is important for RDBMS like Postgres or Greenplum where number of connections is strictly limited and limit is usually quite low.
- Set up ApplicationName
(client info) for Clickhouse, MongoDB, MSSQL, MySQL and Oracle. (#339, #248)
Also update ApplicationName
format for Greenplum, Postgres, Kafka and SparkS3.
Now all connectors have the same ApplicationName
format: ${spark.applicationId} ${spark.appName} onETL/${onetl.version} Spark/${spark.version}
The only connections not sending ApplicationName
are Teradata and FileConnection implementations.
- Now DB.check()
will test connection availability not only on Spark driver, but also from some Spark executor. (#346)
This allows to fail immediately if Spark driver host has network access to target DB, but Spark executors have not.
#### NOTE
Now Greenplum.check()
requires the same user grants as DBReader(connection=greenplum)
:
-- yes, "writable" for reading data from GP, it's not a mistake
ALTER USER username CREATEEXTTABLE(type = 'writable', protocol = 'gpfdist');
-- for both reading and writing to GP
-- ALTER USER username CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist') CREATEEXTTABLE(type = 'writable', protocol = 'gpfdist');
Please ask your Greenplum administrators to provide these grants.
Bug Fixes
- Avoid suppressing Hive Metastore errors while using
DBWriter
. (#329)
Previously this was implemented as:
try:
spark.sql(f"SELECT * FROM {table}")
table_exists = True
except Exception:
table_exists = False
If Hive Metastore was overloaded and responded with an exception, it was considered as non-existing table, resulting
to full table override instead of append or override only partitions subset.
- Fix using onETL to write data to PostgreSQL or Greenplum instances behind pgbouncer with pool_mode=transaction
. (#336)
Previously Postgres.check()
opened a read-only transaction, pgbouncer changed the entire connection type from read-write to read-only,
and when DBWriter.run(df)
executed in read-only connection, producing errors like:
org.postgresql.util.PSQLException: ERROR: cannot execute INSERT in a read-only transaction
org.postgresql.util.PSQLException: ERROR: cannot execute TRUNCATE TABLE in a read-only transaction
Added a workaround by passing readOnly=True
to JDBC params for read-only connections, so pgbouncer may differ read-only and read-write connections properly.
After upgrading onETL 0.13.x or higher the same error still may appear of pgbouncer still holds read-only connections and returns them for DBWriter. To this this, user can manually convert read-only connection to read-write:
postgres.execute("BEGIN READ WRITE;") # <-- add this line
DBWriter(...).run()
After all connections in pgbouncer pool were converted from read-only to read-write, and error fixed, this additional line could be removed.
See Postgres JDBC driver documentation.
- Fix MSSQL.fetch(...)
and MySQL.fetch(...)
opened a read-write connection instead of read-only. (#337)
Now this is fixed:
: * MSSQL.fetch(...)
establishes connection with ApplicationIntent=ReadOnly
.
* MySQL.fetch(...)
calls SET SESSION TRANSACTION READ ONLY
statement.
- Fixed passing multiple filters to FileDownloader
and FileMover
. (#338)
If was caused by sorting filters list in internal logging method, but FileFilter
subclasses are not sortable.
- Fix a false warning about a lof of parallel connections to Grenplum. (#342)
Creating Spark session with .master("local[5]")
may open up to 6 connections to Greenplum (=number of Spark executors + 1 for driver),
but onETL instead used number of CPU cores on the host as a number of parallel connections.
This lead to showing a false warning that number of Greenplum connections is too high,
which actually should be the case only if number of executors is higher than 30.
- Fix MongoDB trying to use current database name as authSource
. (#347)
Use default connector value which is admin
database. Previous onETL versions could be fixed by:
from onetl.connection import MongoDB
mongodb = MongoDB(
...,
database="mydb",
extra={
"authSource": "admin",
},
)
Dependencies
- Minimal
etl-entities
version is now 2.5.0. (#331) - Update DB connectors/drivers to latest versions: (#345)
: * Clickhouse
0.6.5
→0.7.2
- MongoDB
10.4.0
→10.4.1
- MySQL
9.0.0
→9.2.0
- Oracle
23.5.0.24.07
→23.7.0.25.01
- Postgres
42.7.4
→42.7.5
- MongoDB
Doc only Changes
- Split large code examples to tabs. (#344)