Python SQLAlchemy: A Tutorial

Python SQLAlchemy provides a Pythonic way of interacting with relational databases and can help you streamline your workflow. Here’s what you need to know.

Written by Vinay Kudari
Published on Nov. 10, 2022
Image: Shutterstock / Built In
Image: Shutterstock / Built In
Brand Studio Logo

We often encounter data as relational databases. We would generally need to write raw SQL queries, pass them to the database engine and parse the returned results as a normal array of records to work with them.

What Is Python SQLAlchemy?

Python SQLAlchemy is a database toolkit that provides users with a Pythonic way of interacting with relational databases. The program allows users to write data queries in Python rather than having to navigate the differences between specific dialects of SQL, like MySQL, PostgreSQL and Oracle, which can make workflows more efficient and streamlined.  

SQLAlchemy provides a “Pythonic” way of interacting with those databases. Rather than dealing with the differences between specific dialects of traditional SQL, such as MySQL, PostgreSQL or Oracle, you can leverage the Pythonic framework of SQLAlchemy to streamline your workflow and more efficiently query your data.

 

How to Use Python SQLAlchemy

Installing the Package

pip install sqlalchemy

 

Connecting to a Database

To start interacting with the database, we first need to establish a connection.

import sqlalchemy as db
engine = db.create_engine('dialect+driver://user:pass@host:port/db')

More on Data Science: These Python Scripts Will Help You Automate Your Data Analysis

 

Viewing Table Details

SQLAlchemy can be used to automatically load tables from a database using reflection. Reflection is the process of reading the database and building the metadata based on that information.

For example:

import sqlalchemy as db

engine = db.create_engine('sqlite:///census.sqlite')
connection = engine.connect()
metadata = db.MetaData()
census = db.Table('census', metadata, autoload=True, autoload_with=engine)

# Print the column names
print(census.columns.keys())

['state', 'sex', 'age', 'pop2000', 'pop2008']

# Print full table metadata
print(repr(metadata.tables['census']))

Table('census', MetaData(bind=None), Column('state', VARCHAR(length=30), table=), Column('sex', VARCHAR(length=1), table=), Column('age', INTEGER(), table=), Column('pop2000', INTEGER(), table=), Column('pop2008', INTEGER(), table=), schema=None)

 

Querying

Table and MetaData have already been imported. The metadata is available as metadata.

import sqlalchemy as db

engine = db.create_engine('sqlite:///census.sqlite')
connection = engine.connect()
metadata = db.MetaData()
census = db.Table('census', metadata, autoload=True, autoload_with=engine)

#Equivalent to 'SELECT * FROM census'
query = db.select([census]) 

ResultProxy = connection.execute(query)

ResultSet = ResultProxy.fetchall()

ResultSet[:3]

[('Illinois', 'M', 0, 89600, 95012),
 ('Illinois', 'M', 1, 88445, 91829),
 ('Illinois', 'M', 2, 88729, 89547)]

There are a couple functions to know, including:

  • ResultProxy: The object returned by the .execute() method. It can be used in a variety of ways to return data via a query.
  • ResultSet: The actual data asked for in the query when using a fetch method such as .fetchall() on a ResultProxy.

 

Dealing With a Large ResultSet

We use .fetchmany() to load optimal no of rows and overcome memory issues in case of large datasets.

while flag:
    partial_results = ResultProxy.fetchmany(50)
    if(partial_results == []): 
	flag = False
    //
	code
   //
ResultProxy.close()

 

Converting to DataFrame

df = pd.DataFrame(ResultSet)
df.columns = ResultSet[0].keys()

 

How to Filter Data with Python SQLAlchemy

Let’s see some examples of raw SQLite Queries and queries using SQLAlchemy.

Important Functions to Know to Filter Data With Python SQLAlchemy

  • Where
  • In
  • And, or, not
  • Order by
  • Avg, count, min, max
  • Group by
  • Distinct
  • Case and cast
  • Joins

 

Where

SQL :
SELECT * FROM census
WHERE sex = F

SQLAlchemy :
db.select([census]).where(census.columns.sex == 'F')

 

In

SQL :
SELECT state, sex
FROM census
WHERE state IN (Texas, New York)

SQLAlchemy :
db.select([census.columns.state, census.columns.sex]).where(census.columns.state.in_(['Texas', 'New York']))

 

And, Or, Not

SQL :
SELECT * FROM census
WHERE state = 'California' AND NOT sex = 'M'

SQLAlchemy :
db.select([census]).where(db.and_(census.columns.state == 'California', census.columns.sex != 'M'))

 

Order By

SQL :
SELECT * FROM census
ORDER BY State DESC, pop2000

SQLAlchemy :
db.select([census]).order_by(db.desc(census.columns.state), census.columns.pop2000)
A tutorial on the basics of Python SQLAlchemy. | Video: Ssali Jonathan

 

Important Functions to Know for Python SQLAlchemy

SQL :
SELECT SUM(pop2008)
FROM census

SQLAlchemy :
db.select([db.func.sum(census.columns.pop2008)])
Other functions include avg, count, min and max.

 

Group by

SQL :
SELECT SUM(pop2008) as pop2008, sex
FROM census

SQLAlchemy :
db.select([db.func.sum(census.columns.pop2008).label('pop2008'), census.columns.sex]).group_by(census.columns.sex)

 

Distinct

SQL :
SELECT DISTINCT state
FROM census

SQLAlchemy :
db.select([census.columns.state.distinct()])

 

Case and Cast

The case() expression accepts a list of conditions to match and the column to return if the condition matches, followed by an else_ if none of the conditions match. Use the cast() function to convert an expression to a particular type.

For example:

import sqlalchemy as db

engine = db.create_engine('sqlite:///census.sqlite')
connection = engine.connect()
metadata = db.MetaData()
census = db.Table('census', metadata, autoload=True, autoload_with=engine)

female_pop = db.func.sum(db.case([(census.columns.sex == 'F', census.columns.pop2000)],else_=0))

total_pop = db.cast(db.func.sum(census.columns.pop2000), db.Float)

query = db.select([female_pop/total_pop * 100])

result = connection.execute(query).scalar()
print(result)

51.09467432293413

We use .scalar to the result when the result contains only a single value.

 

Joins

If you have two tables that already have an established relationship, you can automatically use that relationship by just adding the columns we want from each table to the select statement. Or you can do it manually.

select([census.columns.pop2008, state_fact.columns.abbreviation])

For example, let’s start with importing the database:

import sqlalchemy as db
import pandas as pd

engine = db.create_engine('sqlite:///census.sqlite')
connection = engine.connect()
metadata = db.MetaData()

census = db.Table('census', metadata, autoload=True, autoload_with=engine)
state_fact = db.Table('state_fact', metadata, autoload=True, autoload_with=engine) 

 

Automatic Join

#Automatic Join
query = db.select([census.columns.pop2008, state_fact.columns.abbreviation])
result = connection.execute(query).fetchall()
df = pd.DataFrame(results)
df.columns = results[0].keys()
df.head(5)

state	sex	age	pop2000	pop2008	id	name	abbreviation	country	type	...	occupied	notes	fips_state	assoc_press	standard_federal_region	census_region	census_region_name	census_division	census_division_name	circuit_court
0	Illinois	M	0	89600	95012	13	Illinois	IL	USA	state	...	occupied		17	Ill.	V	2	Midwest	3	East North Central	7
1	Illinois	M	1	88445	91829	13	Illinois	IL	USA	state	...	occupied		17	Ill.	V	2	Midwest	3	East North Central	7
2	Illinois	M	2	88729	89547	13	Illinois	IL	USA	state	...	occupied		17	Ill.	V	2	Midwest	3	East North Central	7
3	Illinois	M	3	88868	90037	13	Illinois	IL	USA	state	...	occupied		17	Ill.	V	2	Midwest	3	East North Central	7
4	Illinois	M	4	91947	91111	13	Illinois	IL	USA	state	...	occupied		17	Ill.	V	2	Midwest	3	East North Central	7
5 rows × 22 columns

 

Manual Join 

#Manual Join
query = db.select([census, state_fact])
query = query.select_from(census.join(state_fact, census.columns.state == state_fact.columns.name))
results = connection.execute(query).fetchall()
df = pd.DataFrame(results)
df.columns = results[0].keys()
df.head(5)

state	sex	age	pop2000	pop2008	id	name	abbreviation	country	type	...	occupied	notes	fips_state	assoc_press	standard_federal_region	census_region	census_region_name	census_division	census_division_name	circuit_court
0	Illinois	M	0	89600	95012	13	Illinois	IL	USA	state	...	occupied		17	Ill.	V	2	Midwest	3	East North Central	7
1	Illinois	M	1	88445	91829	13	Illinois	IL	USA	state	...	occupied		17	Ill.	V	2	Midwest	3	East North Central	7
2	Illinois	M	2	88729	89547	13	Illinois	IL	USA	state	...	occupied		17	Ill.	V	2	Midwest	3	East North Central	7
3	Illinois	M	3	88868	90037	13	Illinois	IL	USA	state	...	occupied		17	Ill.	V	2	Midwest	3	East North Central	7
4	Illinois	M	4	91947	91111	13	Illinois	IL	USA	state	...	occupied		17	Ill.	V	2	Midwest	3	East North Central	7
5 rows × 22 columns

More on Data Science: How to Use a Z-Table and Create Your Own

 

How to Create and Insert Data into Tables with Python SQLAlchemy

When you pass the database, which is not present, to the engine, SQLAlchemy automatically creates a new database.

import sqlalchemy as db
import pandas as pd

#Creating Database and Table
engine = db.create_engine('sqlite:///test.sqlite') #Create test.sqlite automatically
connection = engine.connect()
metadata = db.MetaData()

emp = db.Table('emp', metadata,
              db.Column('Id', db.Integer()),
              db.Column('name', db.String(255), nullable=False),
              db.Column('salary', db.Float(), default=100.0),
              db.Column('active', db.Boolean(), default=True)
              )

metadata.create_all(engine) #Creates the table

#Inserting Data
#Inserting record one by one
query = db.insert(emp).values(Id=1, name='naveen', salary=60000.00, active=True) 
ResultProxy = connection.execute(query)

#Inserting many records at ones
query = db.insert(emp) 
values_list = [{'Id':'2', 'name':'ram', 'salary':80000, 'active':False},
               {'Id':'3', 'name':'ramesh', 'salary':70000, 'active':True}]
ResultProxy = connection.execute(query,values_list)

results = connection.execute(db.select([emp])).fetchall()
df = pd.DataFrame(results)
df.columns = results[0].keys()
df.head(4)

Id	name	salary	active
0	1	vinay	60000.0	True
1	1	satvik	60000.0	True
2	1	naveen	60000.0	True
3	2	rahul	80000.0	False

 

Updating Data in Databases

db.update(table_name).values(attribute = new_value).where(condition)
​import sqlalchemy as db
import pandas as pd

engine = db.create_engine('sqlite:///test.sqlite')
metadata = db.MetaData()
connection = engine.connect()
emp = db.Table('emp', metadata, autoload=True, autoload_with=engine)

results = connection.execute(db.select([emp])).fetchall()
df = pd.DataFrame(results)
df.columns = results[0].keys()
df.head(4)

Id	name	salary	active
0	1	vinay	60000.0	True
1	1	satvik	60000.0	True
2	1	naveen	60000.0	True
3	2	rahul	80000.0	False

# Build a statement to update the salary to 100000
query = db.update(emp).values(salary = 100000)
query = query.where(emp.columns.Id == 1)
results = connection.execute(query)

results = connection.execute(db.select([emp])).fetchall()
df = pd.DataFrame(results)
df.columns = results[0].keys()
df.head(4)

Id	name	salary	active
0	1	vinay	100000.0	True
1	1	satvik	100000.0	True
2	1	naveen	100000.0	True
3	2	rahul	80000.0	False

​

 

Delete Table

db.delete(table_name).where(condition)
import sqlalchemy as db
import pandas as pd

engine = db.create_engine('sqlite:///test.sqlite')
metadata = db.MetaData()
connection = engine.connect()
emp = db.Table('emp', metadata, autoload=True, autoload_with=engine)

results = connection.execute(db.select([emp])).fetchall()
df = pd.DataFrame(results)
df.columns = results[0].keys()
df.head(4)

Id	name	salary	active
0	1	vinay	100000.0	True
1	1	satvik	100000.0	True
2	1	naveen	100000.0	True
3	2	rahul	80000.0	False

# Build a statement to delete where salary < 100000
query = db.delete(emp)
query = query.where(emp.columns.salary < 100000)
results = connection.execute(query)

results = connection.execute(db.select([emp])).fetchall()
df = pd.DataFrame(results)
df.columns = results[0].keys()
df.head(4)

	Id	name	salary	active
0	1	vinay	100000.0	True
1	1	satvik	100000.0	True
2	1	naveen	100000.0	True

 

Dropping a Table

table_name.drop(engine) #drops a single table
metadata.drop_all(engine) #drops all the tables in the database
Explore Job Matches.