Query Data from PostgreSQL Using Luigi

There is a luigi.postgres module, which is perfectly capable of getting data into a PostgreSQL database with little effort, but how to pull data out of the table from within a Luigi task? Here is an example of how to read data from a Postgres using a common SQL query. This code fragment is taken from a larger self-contained Luigi data flow example which is introduced in the "Try Luigi with Vagrant" article. You can see it in context in the Github repository.

# these modules are needed for the task
import luigi
import psycopg2

class QueryPostgres(luigi.Task):
    def output(self):
        # the output will be a .csv file
        return luigi.LocalTarget("/home/vagrant/data/same_purchases.csv")

    def run(self):
        # these are here for convenience, you'll use
        # environment variables for production code
        host = "localhost"
        database = "vagrantdb"
        user = "vagrant"
        password = "vagrant"

        conn = psycopg2.connect(
        cur = conn.cursor()
          FROM purchases
        rows = cur.fetchall()

        with self.output().open("w") as out_file:
            # write a csv header 'by hand'
            out_file.write("name, date, price, amount")
            for row in rows:
                # without the :%s, the date will be output in year-month-day format
                # the star before row causes each element to be placed by itself into format
                out_file.write("{}, {:%s}, {}, {}".format(*row))

This task connects to a Postgres database using the provided credentials through the psycopg2 module, executes a simple SELECT query and dumps the results into a csv file, the location of which was defined in the output function. This class can be used in a Luigi data pipeline, and will play nice with others, if given non-input dependencies through a _requires function.