How to Import CSV Files into PostgreSQL Automatically
If you work with databases, manually uploading CSV data into PostgreSQL is a bottleneck. It’s slow, prone to human error, and creates a lag between data generation and availability.
In this guide, I will show you how to build a Real-Time Data Pipeline. Using Python's watchdog library, we will create a script that "listens" to a specific folder. The moment a new CSV is dropped in, the script wakes up, parses the file, and pushes the data to PostgreSQL instantly.
The Architecture
Before writing code, it is helpful to understand the flow. We are moving from a manual "Pull" method to an automated "Event-Driven" method.
- The Trigger: The OS File System events (monitored by
watchdog). - The Processor: A Python script using
pandasorcsvmodules. - The Destination: A PostgreSQL table using
psycopg2.
Step 1: Prerequisites
You will need a few libraries to handle the file monitoring and database connection. Run the following command in your terminal:
pip install psycopg2-binary watchdog pandas
Step 2: The Watchdog Logic
The core of this automation is the FileSystemEventHandler. Instead of running a loop that checks the folder every second (which wastes CPU), we wait for the OS to tell us a file has been modified.
Here is the critical snippet of the Python logic:
class CSVHandler(FileSystemEventHandler):
def on_created(self, event):
if not event.is_directory and event.src_path.endswith(".csv"):
print(f"New CSV detected: {event.src_path}")
# Trigger the upload function here
upload_to_postgres(event.src_path)
if __name__ == "__main__":
observer = Observer()
event_handler = CSVHandler()
observer.schedule(event_handler, path="./my_data_folder", recursive=False)
observer.start()
Step 3: Handling Duplicates (Upsert)
A common issue with automation is re-uploading the same file twice. If your table has a Primary Key (e.g., transaction_id), PostgreSQL will throw an error if you try to insert a duplicate.
To handle this gracefully, we use the ON CONFLICT clause in our SQL query:
INSERT INTO sales_data (id, product, amount) VALUES (%s, %s, %s) ON CONFLICT (id) DO UPDATE SET amount = EXCLUDED.amount;
This ensures that if the ID already exists, the script updates the existing record instead of crashing.
Watch the Full Video Tutorial
I have recorded a complete coding session where I demonstrate the script running live, dropping files, and verifying the data in pgAdmin.
Source Code & Resources
You can download the full, production-ready script (including error logging and database config) from the GitHub repository:
📂 Download Python Script (GitHub)
Troubleshooting Tips
- File Locked Error: Sometimes
watchdogtriggers before the file is fully copied. You may need to add a smalltime.sleep(1)before processing. - Permission Denied: Ensure the user running the Python script has Read/Write access to the monitored folder.
Found this helpful? Subscribe to the channel for more Data Engineering automation tips.
Comments
Post a Comment