Big Data with PostgreSQL and ADBC?

Your resources are your Linux machine, Python, and a PostgreSQL database? Join me with a DB connection optimization experiment.

AliMassoud
Stackademic

--

If you are a data engineer, or MLOps engineer who is unlucky to have access to Snowflake or DataBricks to deal with your big data, and you are considering to optimize your DB connection to insert millions of records then this article is for you.

Objective

Today’s problem is to find an efficient methodology of PostgreSQL database insertion that could handle big data.

Overview

  • What is ADBC?
  • Create a PostgreSQL on a docker container locally.
  • Launch three experiments to write 10M and 100M records table to a PostgreSQL DB using SQLAlchemy (most known and used python package to establish a connection to databases) and ADBC (Arrow Database Connectivity).

Github project and sample code to clone here

What is ADBC?

It is an API collection that allows Arrow-native to access databases. It executes SQL queries, database catalogs, and more.

What is Apache Arrow?

It is a language-independant columnar memory format for flat and hierarchical data, It is designed to use zero-copy data reads and without serialization overhead (converting data structures and objects into a format that can be easily transmitted or stored and later reconstructed back for usage).

What is the purpose of using ADBC?

We use Arrow to avoid making copies of data on reads and speed up data access with eliminating serialization overhead. In addition, the driver is responsible for converting objects from/to arrow where required and that helps developers focus on their main objective.

Setup Experiment’s Environment

This project expects that you have python, docker, docker-compose installed, and docker deamon is running. Please clone this github project, and follow the steps below, I will make sure to explain every step as much as possible.

The project file structure is as follows:

ADBC-Driver-with-PostgreSQL/
├── .git
├── docker-compose.yml
├── Makefile
├── requirements.txt
├── README.md
├── testing.ipynb # jupyter notebook for testing our functions
└── Scripts/
└── write_db.py

1- We navigate to the project directory and create a python Virtual Environment using venv built-in package, let’s call it .venv, then activate it:

cd ADBC-Driver-with-PostgreSQL
python -m venv .venv
source .venv/bin/activate

You should see (.venv) next to your username in terminal.

The main reason for using virtual environments is that the env keeps your project’s packages in your project’s scope, instead of forcing you to install your packages system-wide.

2- Let’s install the packages needed for this experiment, please note that they are all mentioned with their version in the requirements.txt’, I have included a very simple Makefile’ to upgrade pip package and install all our dependances. To use the Makefile’, please run the following:

make install

If you are not using linux, please download make before proceeding, OR you could open the file and copy the command to command-line like below:

pip install --upgrade pip # 'python -m' at the beginning on windows
pip install -r requirements.txt # 'python -m' at the beginning on windows

3- Once the installation is done, please execute the following command to deploy a postgreSQL database server on a docker container locally:

docker-compose up -d

the up command of docker-compose Builds, (re)creates, starts, and attaches to containers for a service (postgreSQL server is our service today).

the-d flag indicates — detachto run the container in the background.

The docker container deploys a postgreSQL database server which helps us simulate a real life example and demonstrate a real database insertion experiment, feel free to check out the comments in the docker-compose file for more details.

check if the container is healthy and running using:

docker ps

if no errors encountered, we are ready to experiment!

Random Data Generation

Please check the code below in which I generate a pandas dataframe that has 10 million rows with three columns (integer, float, and strings).

I execute my code in ‘testing.ipynb’ which is a jupyter notebook file.

import pandas as pd
import numpy as np
import random
import string
import adbc_driver_postgresql.dbapi
import pyarrow.parquet

# Number of rows in the DataFrame
num_rows = 10000000

# Generate random integers
integers = np.random.randint(1, 10000, size=num_rows)

# Generate random floats
floats = np.random.random(size=num_rows)

# Generate random strings
strings = [''.join(random.choices(string.ascii_uppercase + string.digits, k=10)) for _ in range(num_rows)]

# Create DataFrame
df = pd.DataFrame({
'myintegers': integers,
'myfloats': floats,
'mystrings': strings
})
df.head()

This will give us the following dataframe:

Output of df.head() at the end of the cell.

Data Description: the integers are ranging between (1 and 10000), floats are between (0 and 1) and string are random with 10 characters length.

Experiment 1

let’s create an SQLAlchemy engine and write our dataframe on the DB using that engine:

# Create our url, mode of data insertion.
url = 'postgresql://<username>:<password>@127.0.0.1:9019/general'
mode = 'create'
# import our functions that are written in write_db.py file
from Scripts.write_db import write_to_db, write_to_db_sqlalchemy, write_to_db_adbc
# The write_to_db() function is: 
# Write a dataframe to a database table using df.to_sql
def write_to_db(df: pd.DataFrame, table_name: str, url: str) -> None:
engine = create_engine(url)
df.to_sql(table_name, engine, if_exists="replace", index=False)
print(f"Dataframe written to table {table_name} in the database")

# we create table_name variable (we will change the table name for each function).
write_to_db(df, table_name, url)
Output of write_to_db function that uses pd.to_sql()

Results: This code will take around 5 minutes and 45.5 seconds to push 10 million rows to a PostgreSQL table.

Experiment 2

I looked for a better code that uses SQLAlchemy and I found this code on StackOverFlow that performs very well:

# The write_to_db_sqlalchemy() function is:
# Write a dataframe to a database table using SQLAlchemy
def write_to_db_sqlalchemy(df: pd.DataFrame, table_name: str, url: str) -> None:
engine = create_engine(url)
connection = engine.raw_connection()
cursor = connection.cursor()

# create the table but first drop if it already exists
command = f"""DROP TABLE IF EXISTS {table_name}; CREATE TABLE {table_name} ("Integers" integer,"Floats" double precision,"Strings" text);"""

cursor.execute(command)
connection.commit()

# stream the data using 'to_csv' and StringIO(); then use sql's 'copy_from' function
output = StringIO()
# ignore the index
df.to_csv(output, sep="\t", header=False, index=False)
# jump to start of stream
output.seek(0)
contents = output.getvalue()
cur = connection.cursor()
# null values become ''
cur.copy_from(output, table_name, null="")
connection.commit()
cur.close()
connection.close()
print("Dataframe written to table in the database")

table_name = 'random_data2'
write_to_db_sqlalchemy(df, table_name, url)

Description below is from the comments on Stackoverflow

This code works only with PostgreSQL (due to copy_from), the code converts the dataframe to a csv format and passes it to StringIO which is a text stream method that uses in-memory text buffer, then it uses the stream to write the data to the database table.

Results of write_to_db_sqlalchemy() that uses cur.copy_from() from stackoverflow

Results: This code inserts 10 million records within 1 minute and 4.6 seconds.

Could we do better? Let’s jump to ADBC Experiment.

Experiment 3

In This experiment, we will leverage the compression of parquet (Apache Parquet is a column-based file format), and use it with ADBC driver to write on the database. We write the dataframe to a parquet file, I nammed it random_data.parquet’, then I opened the parquet file and pushed rows to the database.

# The write_to_db_adbc() function is:
# Write a dataframe to a database table using ADBC Driver
def write_to_db_adbc(
df: pd.DataFrame,
table_name: str,
url: str,
MODE: str = "create",
BATCH_SIZE: int = 150000,
) -> None:
df.to_parquet("random_data1.parquet")
with adbc_driver_postgresql.dbapi.connect(url) as conn:
with conn.cursor() as cur:
table = pyarrow.parquet.ParquetFile("./random_data1.parquet")
print(f"{MODE} table {table_name} in postgres...")
cur.adbc_ingest(
table_name, table.iter_batches(batch_size=BATCH_SIZE), mode=MODE
)

conn.commit()
cur.close()
conn.close()
print("Dataframe written to table in the database")

table_name = 'random_data3'
write_to_db_adbc(df, table_name, url, MODE=mode, BATCH_SIZE=150000)
Output of ADBC postgresql driver on 10M rows insertion

Results: This code inserts 10 million rows to a PostgreSQL database table within 32.4 seconds!

40 Million Rows

If we replicate the same experiment on 40 million rows insertion ( just change the num_rows variable) we could have the following results:

Experiment 1: I had to abort the code since it took more than 20 mins.

Experiment 2 (Stackoverflow code): 4 minutes 30 seconds.

SQLAlchemy output on 40M rows insertion

Experiment 3 (ADBC with Parquet): 1 minute 41 seconds.

ADBC Driver on 40M rows insertion

ADBC could be optimized with fine tuning the batch_size value.

Conclusion

From our experiment, we could observe the efficiency of Arrow Database Connectivity (ADBC) and the speed up that we have when writing millions of rows to a PostgreSQL database, the bigger the data we have the more evident the performance of ADBC is.

I hope you enjoyed this article,

Happy coding!

Resources

Stackademic 🎓

Thank you for reading until the end. Before you go:

--

--