YTEP-0001: IO Chunking

Abstract

Created: November 26, 2012 Author: Matthew Turk

IO in yt 2.x has always been based on batching IO based on grids. This YTEP describes a new method, which allows for a selection of keywords (‘spatial’, ‘all’, ‘io’) to describe methods of IO that are then left to the frontend or geometry handler to implement. This way, the frontend is able to decide how to access data without any prescriptions on how it should be accessed.

Status

In-Progress: This has been largely implemented for grid and oct geometries in yt-3.0.

Detailed Description

Background

“Chunking” in this section refers to the loading of data off disk in bulk. For traditional frontends in yt, this has been in the form of grids: either single or in bulk, grids have been loaded off disk. When Derived Quantities want to handle individual grids, one at a time, they “preload” the data from whatever grids the ParallelAnalysisInterface thinks they deserve. These grids are iterated over, and handled individually, then the result is combined at the end. Profiles do something similar. However, both of these are de facto, and not really designed. They rely on calls to semi-private functions on data objects, manually masking data, on and on.

An explicit method of data chunking that relies on the characteristics of the desired chunks, rather than the means of the chunking, is needed to bypass this reliance on the grid mediation of IO. In this method, data objects will request that the geometry handler supply a set of chunks. Chunks are of the form (IO_unit, size), where IO_unit is only ever managed or handled by _read_selection. This allows the information about all types of IO and collections of data to live internal to the individual implementations of GeometryHandler objects. This way, Grids can still batch based on Grid information, but this abstraction is not needed for Octree IO.

Note that YTEP-0017 redefines GeometryHandler to Index – this reflects the fact that the process of data selection and IO is better thought of as a process of indexing, and that any subsequent operations should be conducted at a higher level.

Main Changes

  • Data objects no longer have a _grids attribute.
  • Parallelism is restructured to iterate over chunks (decided on by the geometry handler) rather than grids
  • Grids do not exist outside of the grid geometry handler
  • To specifically break backwards compatibility, a blocks property has been added which will iterate and yield block-like data (i.e., grids) and the mask of the blocks. This should encompass the use case of both iterating over the _grids attribute, obtaining the mask that selects points inside a region, and having a 3D dataset. This should be used exceedingly rarely, but it will be implemented for all types of data. All routines that call upon _grids directly must be updated to use blocks.

Implementation

The chunking system is implemented in a geometry handler through several functions. The GeometryHandler class needs to have the following routines implemented:

  • _identify_base_chunk(self, dobj): this routine must set the _current_chunk attribute on dobj to be equal to a chunk that represents the full selection of data for that data object. This is the “base” chunk from which other chunks will be subselected.
  • _count_selection(self, dobj, sub_objects): this must count and return the count of cells within a given data object.
  • _chunk_io(self, dobj): this function should yield a series of YTDataChunk objects that have been ordered and created to consolidate IO.
  • _chunk_spatial(self, dobj, ngz, sort = None, preload_fields = None): this should yield a series of YTDataChunk objects which have been created to allow for spatial access of the data. For grids, this means 3D objects, and for Octs the behavior is undefined but should be 3D or possibly a string of 3D objects. This is where ghost zone generation will occur, although that has not yet been implemented. Optionally, the chunk request can also provide a “hint” to the chunking system of which fields will be necessary. This is discussed below.
  • _chunk_all(self, dobj): this should yield a single chunk that contains the entire data object.

The only place that YTDataChunk objects will ever be directly queried is inside the _read_fluid_selection and _read_particle_selection routines, which are implemented by the geometry handler itself. This means that the chunks can be completely opaque external to the geometry handlers.

To start the chunks shuffling over the output, the code calls data_source.chunks(fields, chunking_style). Right now only “spatial”, “io” and “all” are supported for chunking styles. This corresponds to spatially-oriented division, IO-conserving, and all-at-once (not usually relevant.) The chunks function looks like this:

def chunks(self, fields, chunking_style, **kwargs):
    for chunk in self.hierarchy._chunk(self, chunking_style, **kwargs):
        with self._chunked_read(chunk):
            self.get_data(fields)
            yield self

Note what it does here – it actually yields itself. However, inside the chunked_read function, what happens is that the attributes corresponding to the size, the current data source, and so on, are set by the geometry handler (still called a hierarchy here.) So, for instance, execution might look like this:

for ds in my_obj.chunks(["Density"], "spatial"):
    print ds is my_obj
    print ds["Density"].size

The first line will actually print True, but the results from the second one will be the size of (for instance) the grid it’s currently iterating over. In this way, it becomes much easier to stride over subsets of data. Derived quantities now look like this:

chunks = self._data_source.chunks([], chunking_style="io")
for ds in parallel_objects(chunks, -1):
    rv = self.func(ds, *args, **kwargs)

It chunks data off disk, evaluates and then stores intermediate results.

This is not meant to replace spatial decomposition in parallel jobs, but it is designed to enable much easier and mesh-neutral division of labor for parallelism and for IO. If we were to call chunk on an octree, it no longer has to make things look like grids; it just makes them look like flattened arrays (unless you chunk over spatial, which I haven’t gotten into yet.)

Essentially, by making the method of subsetting and striding over subsetted data more compartmentalized, the code becomes more clear and more maintainable.

Field Preloading

A common problem with the current chunking system is the problem of preloading for data access for spatial fields. For instance, inside the field generation system, this construction is used:

for io_chunk in self.chunks([], "io"):
    for i,chunk in enumerate(self.chunks(field, "spatial", ngz = 0)):

At this point in the system, a single field is being generated and all of the dependencies for that field can be calculated using _identify_field_dependencies, but this is not done. The chunking will first break into IO chunks, and then iterate over those chunks in a spatial chunk. This results in IO not being conducted on the IO chunks, but instead on each individual spatial chunk. For octree datasets, this is not typically that bad, as a spatial chunk there can consist of many items. However, for patch-based datasets (particularly Enzo and the current FLASH implementation) this results in far more fine-grained IO access than we want. As an example, this would not allow any batching of IO inside HDF5 files, despite already ordering the access to the spatial data in that appropriate order. When depositing particles in Enzo, for instance, this results in a single access to every single grid for each particle deposition operation.

For non-spatial fields, IO chunking is typically quite effective and appropriate for patch datasets.

To remedy this, we need to construct a language for preloading within an IO chunk. This would necessitate the creation of a _field_cache attribute on DataContainer, which would be populated inside the _chunk_io loop, if hinting is available. _read_fluid_fields and _read_particle_fields would then inspect the chunk they are passed, and for any fields that are requested, if they are inside the _field_cache dict (or dict subclass) those values would be returned. This is managed by the _activate_cache method.

This would change the loop above to look something like this:

field_deps = self._identify_dependencies(field)
for io_chunk in self.chunks([], "io"):
    for i,chunk in enumerate(self.chunks(field, "spatial", ngz = 0,
                                         preload_fields = field_deps)):

This should result in much more efficient IO operations as IO for spatial fields will be able to be consolidated. As they are currently implemented, Octrees would likely not need this improvement, and so they will not need to have this implemented. However, all frontends may ultimately benefit from this, as it could trivially be extended to keep all data resident in memory for situations where many passes over a small amount of data are necessary.

Backwards Compatibility

This system changes how data objects access data, and so this may ultimately result in differences in results (due to floating point error). Additionally, any code that relies on access of the _grids attribute on data objects will be broken.

All Octree code will need to be updated for 3.0. All frontends for grids will need to be updated, as this requires somewhat different IO systems to be in place. Updating the grid patch handling will require minimal code change.

Ghost zones have been implemented, but will require further study to ensure that the results are correctly being calculated. Ghost zone-requiring fields are progressing.

To accommodate situations where data objects or processing routines (not derived fields) require information about the shape, connectivity and masking of data, a blocks attribute has been implemented. This attribute will yield masks of data and 3D-shaped data containers, enabling most old grids-using routines to work. By focusing on blocks of data rather than grids, we emphasize that these may be of any size, and may also be generated rather than code-inherent data.

Alternatives

The main alternative for this would be to grid all data, as is done in 2.x. I believe this is not sustainable.