Writing and reading large DataFrames

Writing and reading large DataFrames

When reading an expensive query from a database, we might want to store the result on disk for later usage. The serialization process required to pickle a file consumes a lot of internal memory and may cause errors if the file is very large.

A simple solution is to write to .csv files, which can be done chunk-by-chunk.

This post shows a simple class that for writing .csv files in chunks, that solves two issues when using pandas DataFrame's built-in .to_csv():

  • it keeps the index and its name
  • it keeps the column formats (in particular, datetimes)
In [1]:
import pandas as pd
import os
from functools import wraps
from string import ascii_lowercase
import random 
import numpy as np
import time
import json 

We make a DataFrame that has one million rows, and contains mock-data of various dtypes.

In [2]:
len_df = int(1E6)
date_col = [pd.datetime(2019, 1, i%30+1) for i in range(len_df)]
text_col = ['text_{}'.format(random.choice(ascii_lowercase)) for _ in range(len_df)]
int_col = [i for i in range(len_df)]
float_col = [random.gauss(1, 1) for _ in range(len_df)]
datadict = {'date':date_col,
           'text':text_col,
           'int':int_col,
           'float':float_col}
In [3]:
df = pd.DataFrame.from_dict(datadict)
df.index = list(range(1000, 1000+len_df))
df.index.name = 'my_index'

And a generator that simulates chunked data ingestion (as would typically result from querying large amounts from a databse)

In [4]:
def df_chunk_generator(df, chunksize=10000):
    # Note that we can group by data objects that are passed with the argument "by"
    for chunk in df.groupby(by=np.arange(len(df))//chunksize):
        yield chunk

We define a class with the following properties:

  • It can save csv's to disk incrementally
  • It can load the entire .csv or yield chunks
  • It can be instantiated by referring to the .csv file (thereby guaranteeing that column types, compression etc. are matched for loading)
In [5]:
class ChunkedCsv():
    DATETIME_TYPE = 'datetime64[ns]'
    csv_to_json_mapper = lambda self, x : x.replace('.csv', '.json') if x.endswith('.csv') else x + '.json'
    def __init__(self, filename, compression=None, date_cols=None, indexname=None):
        """A .csv saver and loader that preserves column dtypes and index
        
        Parameters
        ==========
        filename (str) : path to file on disk for writing and reading
        compression (None or str) : value to the compression argument of DataFrame.to_csv() [None]
        """
        self.filename = filename
        self.compression = compression
        self.date_cols = date_cols 
        self.indexname = indexname    
        
        self.save_iteration = 0

    def save(self, df):
        """Method for writing in chunks. The first save operation will create/overwrite self.filename, 
        the second will append to the file
        """
        if self.save_iteration==0:
            self.date_cols = [k for k, v in zip(df.dtypes.index, df.dtypes.values) 
                                  if v == self.DATETIME_TYPE]
            self.indexname = df.index.name
            df.to_csv(self.filename, mode='w', header=True, index=True, index_label=self.indexname, 
                      compression=self.compression)
            self._save_csvchunkedio()

        else:            
            df.to_csv(self.filename, mode='a', header=False, index=True,
                      compression=self.compression)
        self.save_iteration += 1       

    def load(self):
        """Returns the dataframe in its entirety"""
        df = pd.read_csv(self.filename, header=0, parse_dates=self.date_cols, index_col=self.indexname, 
                        compression=self.compression)
        return df
    
    def load_chunks(self, chunksize=1000):
        """Returns a generator that yields chunks of chunksize"""
        return pd.read_csv(self.filename, header=0, parse_dates=self.date_cols, index_col=self.indexname,  
                        compression=self.compression, chunksize=chunksize)
    
    def _save_csvchunkedio(self):
        """Private method. Saves a json to disk that contains the arguments to create an instance
        """
        args_dict={'filename':self.filename, 'compression':self.compression,
                  'date_cols':self.date_cols, 'indexname':self.indexname}
        json_path = self.csv_to_json_mapper(self.filename)
        with open(json_path, 'w') as f:
            json.dump(args_dict, f)
            
    @classmethod
    def load_csvchunkedio(cls, csv_filepath):
        """Returns an instance of CsvChunkedIO
        """
        json_path = cls.csv_to_json_mapper(_, csv_filepath)
        with open(json_path, 'r') as f:
            args_dict = json.load(f)
        return cls(**args_dict)
    
        

Saving files

To demonstrate the chunked saving functionality, we read the dataframe in chunks and save it.

In [6]:
csv = ChunkedCsv(filename='test.csv')
t0 = time.perf_counter()
for _, chunk in df_chunk_generator(df):
    csv.save(chunk)
print('Saving took: {:.3f} s'.format(time.perf_counter() - t0))
Saving took: 12.539 s
In [7]:
csv_compressed = ChunkedCsv(filename='test_compressed.csv', compression='gzip')
t0 = time.perf_counter()
for _, chunk in df_chunk_generator(df):
    csv_compressed.save(chunk)
print('Saving compressed took: {:.3f} s'.format(time.perf_counter() - t0))
Saving compressed took: 22.615 s

Loading files

To read the data in a new Python session, we can instantiate an object of type ChunkedCsv using the classmethod load_csvchunkedio:

In [8]:
csvcio_loading = ChunkedCsv.load_csvchunkedio('test_compressed.csv')

The typical use case is to process the data chunk by chunk. Here we load the entire dataframe by concatenating the individual chunks.

In [14]:
df_loaded = pd.concat((chunk for chunk in csvcio_loading.load_chunks()))

Verify that the datatypes where preserved:

In [16]:
df_loaded.dtypes
Out[16]:
date     datetime64[ns]
text             object
int               int64
float           float64
dtype: object
In [17]:
# Concatenate and sample a few rows to visually compare both DataFrames
(pd.concat((df, df_loaded), axis=1)).sample(5)
Out[17]:
date text int float date text int float
my_index
173325 2019-01-06 text_i 172325 1.196846 2019-01-06 text_i 172325 1.196846
154032 2019-01-03 text_t 153032 0.175423 2019-01-03 text_t 153032 0.175423
374707 2019-01-28 text_x 373707 1.011416 2019-01-28 text_x 373707 1.011416
117691 2019-01-22 text_o 116691 -0.561825 2019-01-22 text_o 116691 -0.561825
962130 2019-01-21 text_d 961130 -0.101972 2019-01-21 text_d 961130 -0.101972
In [11]:
df.eq(df_loaded).sum()
Out[11]:
date     1000000
text     1000000
int      1000000
float     736183
dtype: int64

All columns are completelty equal, except for "float", being different in ~30% of the rows. The reason for the non-perfect matches in the floats-column is rounding errors. These occur because the internal (binary) representation is represented by a rounded float. The errors are, however, ignorably small for all practical reasons:

In [13]:
format((df.float - df_loaded.float).max(), '.3e')
Out[13]:
'8.882e-16'

So there we have it: a simple way to export larger-than-memory DataFrames and read them at a later stage, without losing formatting. This functionality can be wrapped in a decorator, to automate caching of expensive database queries.

Comments