Query Data from PostgreSQL Using Luigi
There is a luigi.postgres module, which is perfectly capable of getting data into a PostgreSQL database with little effort, but how to pull data out of the table from within a Luigi task? Here is an example of how to read data from a Postgres using a common SQL query. This code fragment is taken from a larger self-contained Luigi data flow example which is introduced in the "Try Luigi with Vagrant" article. You can see it in context in the Github repository.
# these modules are needed for the task
import luigi
import psycopg2
class QueryPostgres(luigi.Task):
def output(self):
# the output will be a .csv file
return luigi.LocalTarget("/home/vagrant/data/same_purchases.csv")
def run(self):
# these are here for convenience, you'll use
# environment variables for production code
host = "localhost"
database = "vagrantdb"
user = "vagrant"
password = "vagrant"
conn = psycopg2.connect(
dbname=database,
user=user,
host=host,
password=password)
cur = conn.cursor()
cur.execute("""SELECT
name,
date,
price,
amount
FROM purchases
""")
rows = cur.fetchall()
with self.output().open("w") as out_file:
# write a csv header 'by hand'
out_file.write("name, date, price, amount")
for row in rows:
out_file.write("\n")
# without the :%s, the date will be output in year-month-day format
# the star before row causes each element to be placed by itself into format
out_file.write("{}, {:%s}, {}, {}".format(*row))
This task connects to a Postgres database using the provided credentials through the psycopg2 module, executes a simple SELECT query and dumps the results into a csv file, the location of which was defined in the output function. This class can be used in a Luigi data pipeline, and will play nice with others, if given non-input dependencies through a _requires function.