We often encounter data as relational databases. We would generally need to write raw SQL queries, pass them to the database engine and parse the returned results as a normal array of records to work with them.
What Is Python SQLAlchemy?
SQLAlchemy provides a “Pythonic” way of interacting with those databases. Rather than dealing with the differences between specific dialects of traditional SQL, such as MySQL, PostgreSQL or Oracle, you can leverage the Pythonic framework of SQLAlchemy to streamline your workflow and more efficiently query your data.
How to Use Python SQLAlchemy
Installing the Package
pip install sqlalchemy
Connecting to a Database
To start interacting with the database, we first need to establish a connection.
import sqlalchemy as db
engine = db.create_engine('dialect+driver://user:pass@host:port/db')
Viewing Table Details
SQLAlchemy can be used to automatically load tables from a database using reflection. Reflection is the process of reading the database and building the metadata based on that information.
For example:
import sqlalchemy as db
engine = db.create_engine('sqlite:///census.sqlite')
connection = engine.connect()
metadata = db.MetaData()
census = db.Table('census', metadata, autoload=True, autoload_with=engine)
# Print the column names
print(census.columns.keys())
['state', 'sex', 'age', 'pop2000', 'pop2008']
# Print full table metadata
print(repr(metadata.tables['census']))
Table('census', MetaData(bind=None), Column('state', VARCHAR(length=30), table=), Column('sex', VARCHAR(length=1), table=), Column('age', INTEGER(), table=), Column('pop2000', INTEGER(), table=), Column('pop2008', INTEGER(), table=), schema=None)
Querying
Table
and MetaData
have already been imported. The metadata is available as metadata
.
import sqlalchemy as db
engine = db.create_engine('sqlite:///census.sqlite')
connection = engine.connect()
metadata = db.MetaData()
census = db.Table('census', metadata, autoload=True, autoload_with=engine)
#Equivalent to 'SELECT * FROM census'
query = db.select([census])
ResultProxy = connection.execute(query)
ResultSet = ResultProxy.fetchall()
ResultSet[:3]
[('Illinois', 'M', 0, 89600, 95012),
('Illinois', 'M', 1, 88445, 91829),
('Illinois', 'M', 2, 88729, 89547)]
There are a couple functions to know, including:
- ResultProxy: The object returned by the .execute() method. It can be used in a variety of ways to return data via a query.
- ResultSet: The actual data asked for in the query when using a fetch method such as
.fetchall()
on a ResultProxy.
Dealing With a Large ResultSet
We use .fetchmany()
to load optimal no of rows and overcome memory issues in case of large datasets.
while flag:
partial_results = ResultProxy.fetchmany(50)
if(partial_results == []):
flag = False
//
code
//
ResultProxy.close()
Converting to DataFrame
df = pd.DataFrame(ResultSet)
df.columns = ResultSet[0].keys()
How to Filter Data with Python SQLAlchemy
Let’s see some examples of raw SQLite Queries and queries using SQLAlchemy.
Important Functions to Know to Filter Data With Python SQLAlchemy
- Where
- In
- And, or, not
- Order by
- Avg, count, min, max
- Group by
- Distinct
- Case and cast
- Joins
Where
SQL :
SELECT * FROM census
WHERE sex = F
SQLAlchemy :
db.select([census]).where(census.columns.sex == 'F')
In
SQL :
SELECT state, sex
FROM census
WHERE state IN (Texas, New York)
SQLAlchemy :
db.select([census.columns.state, census.columns.sex]).where(census.columns.state.in_(['Texas', 'New York']))
And, Or, Not
SQL :
SELECT * FROM census
WHERE state = 'California' AND NOT sex = 'M'
SQLAlchemy :
db.select([census]).where(db.and_(census.columns.state == 'California', census.columns.sex != 'M'))
Order By
SQL :
SELECT * FROM census
ORDER BY State DESC, pop2000
SQLAlchemy :
db.select([census]).order_by(db.desc(census.columns.state), census.columns.pop2000)
Important Functions to Know for Python SQLAlchemy
SQL :
SELECT SUM(pop2008)
FROM census
SQLAlchemy :
db.select([db.func.sum(census.columns.pop2008)])
Other functions include avg, count, min and max.
Group by
SQL :
SELECT SUM(pop2008) as pop2008, sex
FROM census
SQLAlchemy :
db.select([db.func.sum(census.columns.pop2008).label('pop2008'), census.columns.sex]).group_by(census.columns.sex)
Distinct
SQL :
SELECT DISTINCT state
FROM census
SQLAlchemy :
db.select([census.columns.state.distinct()])
Case and Cast
The case()
expression accepts a list of conditions to match and the column to return if the condition matches, followed by an else_
if none of the conditions match. Use the cast()
function to convert an expression to a particular type.
For example:
import sqlalchemy as db
engine = db.create_engine('sqlite:///census.sqlite')
connection = engine.connect()
metadata = db.MetaData()
census = db.Table('census', metadata, autoload=True, autoload_with=engine)
female_pop = db.func.sum(db.case([(census.columns.sex == 'F', census.columns.pop2000)],else_=0))
total_pop = db.cast(db.func.sum(census.columns.pop2000), db.Float)
query = db.select([female_pop/total_pop * 100])
result = connection.execute(query).scalar()
print(result)
51.09467432293413
We use .scalar
to the result when the result contains only a single value.
Joins
If you have two tables that already have an established relationship, you can automatically use that relationship by just adding the columns we want from each table to the select statement. Or you can do it manually.
select([census.columns.pop2008, state_fact.columns.abbreviation])
For example, let’s start with importing the database:
import sqlalchemy as db
import pandas as pd
engine = db.create_engine('sqlite:///census.sqlite')
connection = engine.connect()
metadata = db.MetaData()
census = db.Table('census', metadata, autoload=True, autoload_with=engine)
state_fact = db.Table('state_fact', metadata, autoload=True, autoload_with=engine)
Automatic Join
#Automatic Join
query = db.select([census.columns.pop2008, state_fact.columns.abbreviation])
result = connection.execute(query).fetchall()
df = pd.DataFrame(results)
df.columns = results[0].keys()
df.head(5)
state sex age pop2000 pop2008 id name abbreviation country type ... occupied notes fips_state assoc_press standard_federal_region census_region census_region_name census_division census_division_name circuit_court
0 Illinois M 0 89600 95012 13 Illinois IL USA state ... occupied 17 Ill. V 2 Midwest 3 East North Central 7
1 Illinois M 1 88445 91829 13 Illinois IL USA state ... occupied 17 Ill. V 2 Midwest 3 East North Central 7
2 Illinois M 2 88729 89547 13 Illinois IL USA state ... occupied 17 Ill. V 2 Midwest 3 East North Central 7
3 Illinois M 3 88868 90037 13 Illinois IL USA state ... occupied 17 Ill. V 2 Midwest 3 East North Central 7
4 Illinois M 4 91947 91111 13 Illinois IL USA state ... occupied 17 Ill. V 2 Midwest 3 East North Central 7
5 rows × 22 columns
Manual Join
#Manual Join
query = db.select([census, state_fact])
query = query.select_from(census.join(state_fact, census.columns.state == state_fact.columns.name))
results = connection.execute(query).fetchall()
df = pd.DataFrame(results)
df.columns = results[0].keys()
df.head(5)
state sex age pop2000 pop2008 id name abbreviation country type ... occupied notes fips_state assoc_press standard_federal_region census_region census_region_name census_division census_division_name circuit_court
0 Illinois M 0 89600 95012 13 Illinois IL USA state ... occupied 17 Ill. V 2 Midwest 3 East North Central 7
1 Illinois M 1 88445 91829 13 Illinois IL USA state ... occupied 17 Ill. V 2 Midwest 3 East North Central 7
2 Illinois M 2 88729 89547 13 Illinois IL USA state ... occupied 17 Ill. V 2 Midwest 3 East North Central 7
3 Illinois M 3 88868 90037 13 Illinois IL USA state ... occupied 17 Ill. V 2 Midwest 3 East North Central 7
4 Illinois M 4 91947 91111 13 Illinois IL USA state ... occupied 17 Ill. V 2 Midwest 3 East North Central 7
5 rows × 22 columns
How to Create and Insert Data into Tables with Python SQLAlchemy
When you pass the database, which is not present, to the engine, SQLAlchemy automatically creates a new database.
import sqlalchemy as db
import pandas as pd
#Creating Database and Table
engine = db.create_engine('sqlite:///test.sqlite') #Create test.sqlite automatically
connection = engine.connect()
metadata = db.MetaData()
emp = db.Table('emp', metadata,
db.Column('Id', db.Integer()),
db.Column('name', db.String(255), nullable=False),
db.Column('salary', db.Float(), default=100.0),
db.Column('active', db.Boolean(), default=True)
)
metadata.create_all(engine) #Creates the table
#Inserting Data
#Inserting record one by one
query = db.insert(emp).values(Id=1, name='naveen', salary=60000.00, active=True)
ResultProxy = connection.execute(query)
#Inserting many records at ones
query = db.insert(emp)
values_list = [{'Id':'2', 'name':'ram', 'salary':80000, 'active':False},
{'Id':'3', 'name':'ramesh', 'salary':70000, 'active':True}]
ResultProxy = connection.execute(query,values_list)
results = connection.execute(db.select([emp])).fetchall()
df = pd.DataFrame(results)
df.columns = results[0].keys()
df.head(4)
Id name salary active
0 1 vinay 60000.0 True
1 1 satvik 60000.0 True
2 1 naveen 60000.0 True
3 2 rahul 80000.0 False
Updating Data in Databases
db.update(table_name).values(attribute = new_value).where(condition)
import sqlalchemy as db
import pandas as pd
engine = db.create_engine('sqlite:///test.sqlite')
metadata = db.MetaData()
connection = engine.connect()
emp = db.Table('emp', metadata, autoload=True, autoload_with=engine)
results = connection.execute(db.select([emp])).fetchall()
df = pd.DataFrame(results)
df.columns = results[0].keys()
df.head(4)
Id name salary active
0 1 vinay 60000.0 True
1 1 satvik 60000.0 True
2 1 naveen 60000.0 True
3 2 rahul 80000.0 False
# Build a statement to update the salary to 100000
query = db.update(emp).values(salary = 100000)
query = query.where(emp.columns.Id == 1)
results = connection.execute(query)
results = connection.execute(db.select([emp])).fetchall()
df = pd.DataFrame(results)
df.columns = results[0].keys()
df.head(4)
Id name salary active
0 1 vinay 100000.0 True
1 1 satvik 100000.0 True
2 1 naveen 100000.0 True
3 2 rahul 80000.0 False
Delete Table
db.delete(table_name).where(condition)
import sqlalchemy as db
import pandas as pd
engine = db.create_engine('sqlite:///test.sqlite')
metadata = db.MetaData()
connection = engine.connect()
emp = db.Table('emp', metadata, autoload=True, autoload_with=engine)
results = connection.execute(db.select([emp])).fetchall()
df = pd.DataFrame(results)
df.columns = results[0].keys()
df.head(4)
Id name salary active
0 1 vinay 100000.0 True
1 1 satvik 100000.0 True
2 1 naveen 100000.0 True
3 2 rahul 80000.0 False
# Build a statement to delete where salary < 100000
query = db.delete(emp)
query = query.where(emp.columns.salary < 100000)
results = connection.execute(query)
results = connection.execute(db.select([emp])).fetchall()
df = pd.DataFrame(results)
df.columns = results[0].keys()
df.head(4)
Id name salary active
0 1 vinay 100000.0 True
1 1 satvik 100000.0 True
2 1 naveen 100000.0 True
Dropping a Table
table_name.drop(engine) #drops a single table
metadata.drop_all(engine) #drops all the tables in the database