How to Import CSV Files into PostgreSQL Automatically

If you work with databases, manually uploading CSV data into PostgreSQL is a bottleneck. It’s slow, prone to human error, and creates a lag between data generation and availability.

In this guide, I will show you how to build a Real-Time Data Pipeline. Using Python's watchdog library, we will create a script that "listens" to a specific folder. The moment a new CSV is dropped in, the script wakes up, parses the file, and pushes the data to PostgreSQL instantly.


The Architecture

Before writing code, it is helpful to understand the flow. We are moving from a manual "Pull" method to an automated "Event-Driven" method.

  • The Trigger: The OS File System events (monitored by watchdog).
  • The Processor: A Python script using pandas or csv modules.
  • The Destination: A PostgreSQL table using psycopg2.

Step 1: Prerequisites

You will need a few libraries to handle the file monitoring and database connection. Run the following command in your terminal:

pip install psycopg2-binary watchdog pandas

Step 2: The Watchdog Logic

The core of this automation is the FileSystemEventHandler. Instead of running a loop that checks the folder every second (which wastes CPU), we wait for the OS to tell us a file has been modified.

Here is the critical snippet of the Python logic:

class CSVHandler(FileSystemEventHandler):
    def on_created(self, event):
        if not event.is_directory and event.src_path.endswith(".csv"):
            print(f"New CSV detected: {event.src_path}")
            # Trigger the upload function here
            upload_to_postgres(event.src_path)

if __name__ == "__main__":
    observer = Observer()
    event_handler = CSVHandler()
    observer.schedule(event_handler, path="./my_data_folder", recursive=False)
    observer.start()

Step 3: Handling Duplicates (Upsert)

A common issue with automation is re-uploading the same file twice. If your table has a Primary Key (e.g., transaction_id), PostgreSQL will throw an error if you try to insert a duplicate.

To handle this gracefully, we use the ON CONFLICT clause in our SQL query:

INSERT INTO sales_data (id, product, amount)
VALUES (%s, %s, %s)
ON CONFLICT (id) 
DO UPDATE SET amount = EXCLUDED.amount;

This ensures that if the ID already exists, the script updates the existing record instead of crashing.


Watch the Full Video Tutorial

I have recorded a complete coding session where I demonstrate the script running live, dropping files, and verifying the data in pgAdmin.


Source Code & Resources

You can download the full, production-ready script (including error logging and database config) from the GitHub repository:

📂 Download Python Script (GitHub)


Troubleshooting Tips

  • File Locked Error: Sometimes watchdog triggers before the file is fully copied. You may need to add a small time.sleep(1) before processing.
  • Permission Denied: Ensure the user running the Python script has Read/Write access to the monitored folder.

Found this helpful? Subscribe to the channel for more Data Engineering automation tips.

Comments

Popular posts from this blog

Apache NiFi ETL Tutorial for Beginners | Installation & Data Pipeline Basics

ESP32 TFT LCD 240x240 Tutorial-Display Images & DHT11 Sensor

Contact Form

Name

Email *

Message *