Data Reader

DataFeeder

class paddle.fluid.data_feeder.DataFeeder(feed_list, place, program=None)[source]

DataFeeder converts the data that returned by a reader into a data structure that can feed into Executor and ParallelExecutor. The reader usually returns a list of mini-batch data entries. Each data entry in the list is one sample. Each sample is a list or a tuple with one feature or multiple features.

The simple usage shows below:

import paddle.fluid as fluid
place = fluid.CPUPlace()
img = fluid.layers.data(name='image', shape=[1, 28, 28])
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
feeder = fluid.DataFeeder([img, label], fluid.CPUPlace())
result = feeder.feed([([0] * 784, [9]), ([1] * 784, [1])])

If you want to feed data into GPU side separately in advance when you use multi-GPU to train a model, you can use decorate_reader function.

import paddle
import paddle.fluid as fluid

place=fluid.CUDAPlace(0)
data = fluid.layers.data(name='data', shape=[3, 224, 224], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')

feeder = fluid.DataFeeder(place=place, feed_list=[data, label])
reader = feeder.decorate_reader(
        paddle.batch(paddle.dataset.flowers.train(), batch_size=16), multi_devices=False)
Parameters
  • feed_list (list) – The Variables or Variables’name that will feed into model.

  • place (Place) – place indicates feed data into CPU or GPU, if you want to feed data into GPU, please using fluid.CUDAPlace(i) (i represents the GPU id), or if you want to feed data into CPU, please using fluid.CPUPlace().

  • program (Program) – The Program that will feed data into, if program is None, it will use default_main_program(). Default None.

Raises

ValueError – If some Variable is not in this Program.

Examples

import numpy as np
import paddle
import paddle.fluid as fluid

place = fluid.CPUPlace()

def reader():
    yield [np.random.random([4]).astype('float32'), np.random.random([3]).astype('float32')],

main_program = fluid.Program()
startup_program = fluid.Program()

with fluid.program_guard(main_program, startup_program):
    data_1 = fluid.layers.data(name='data_1', shape=[1, 2, 2])
    data_2 = fluid.layers.data(name='data_2', shape=[1, 1, 3])
    out = fluid.layers.fc(input=[data_1, data_2], size=2)
    # ...

feeder = fluid.DataFeeder([data_1, data_2], place)

exe = fluid.Executor(place)
exe.run(startup_program)
for data in reader():
    outs = exe.run(program=main_program,
                   feed=feeder.feed(data),
                   fetch_list=[out])
feed(iterable)

According to feed_list and iterable, converters the input into a data structure that can feed into Executor and ParallelExecutor.

Parameters

iterable (list|tuple) – the input data.

Returns

the result of conversion.

Return type

dict

Examples

import numpy.random as random
import paddle.fluid as fluid

def reader(limit=5):
    for i in range(limit):
        yield random.random([784]).astype('float32'), random.random([1]).astype('int64'), random.random([256]).astype('float32')

data_1 = fluid.layers.data(name='data_1', shape=[1, 28, 28])
data_2 = fluid.layers.data(name='data_2', shape=[1], dtype='int64')
data_3 = fluid.layers.data(name='data_3', shape=[16, 16], dtype='float32')
feeder = fluid.DataFeeder(['data_1','data_2', 'data_3'], fluid.CPUPlace())

result = feeder.feed(reader())
feed_parallel(iterable, num_places=None)

Takes multiple mini-batches. Each mini-batch will be feed on each device in advance.

Parameters
  • iterable (list|tuple) – the input data.

  • num_places (int) – the number of devices. Default None.

Returns

the result of conversion.

Return type

dict

Notes

The number of devices and number of mini-batches must be same.

Examples

import numpy.random as random
import paddle.fluid as fluid

def reader(limit=10):
    for i in range(limit):
        yield [random.random([784]).astype('float32'), random.randint(10)],

x = fluid.layers.data(name='x', shape=[1, 28, 28])
y = fluid.layers.data(name='y', shape=[1], dtype='int64')

feeder = fluid.DataFeeder(['x','y'], fluid.CPUPlace())
place_num = 2
places = [fluid.CPUPlace() for x in range(place_num)]
data = []
exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program())
program = fluid.CompiledProgram(fluid.default_main_program()).with_data_parallel(places=places)
for item in reader():
    data.append(item)
    if place_num == len(data):
        exe.run(program=program, feed=list(feeder.feed_parallel(data, place_num)), fetch_list=[])
        data = []
decorate_reader(reader, multi_devices, num_places=None, drop_last=True)

Converter the input data into a data that returned by reader into multiple mini-batches. Each mini-batch will be feed on each device.

Parameters
  • reader (function) – the reader is the function which can generate data.

  • multi_devices (bool) – whether to use multiple devices or not.

  • num_places (int) – if multi_devices is True, you can specify the number of GPU to use, if multi_devices is None, the function will use all the GPU of the current machine. Default None.

  • drop_last (bool) – whether to drop the last batch if the size of the last batch is less than batch_size. Default True.

Returns

the result of conversion.

Return type

dict

Raises

ValueError – If drop_last is False and the data batch cannot fit for devices.

Examples

import numpy.random as random
import paddle
import paddle.fluid as fluid

def reader(limit=5):
    for i in range(limit):
        yield (random.random([784]).astype('float32'), random.random([1]).astype('int64')),

place=fluid.CUDAPlace(0)
data = fluid.layers.data(name='data', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')

feeder = fluid.DataFeeder(place=place, feed_list=[data, label])
reader = feeder.decorate_reader(reader, multi_devices=False)

exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
for data in reader():
    exe.run(feed=data)

Reader

At training and testing time, PaddlePaddle programs need to read data. To ease the users’ work to write data reading code, we define that

  • A reader is a function that reads data (from file, network, random number generator, etc) and yields data items.

  • A reader creator is a function that returns a reader function.

  • A reader decorator is a function, which accepts one or more readers, and returns a reader.

  • A batch reader is a function that reads data (from reader, file, network, random number generator, etc) and yields a batch of data items.

Data Reader Interface

Indeed, data reader doesn’t have to be a function that reads and yields data items. It can be any function with no parameter that creates a iterable (anything can be used in for x in iterable):

iterable = data_reader()

Element produced from the iterable should be a single entry of data, not a mini batch. That entry of data could be a single item, or a tuple of items. Item should be of supported type (e.g., numpy array or list/tuple of float or int).

An example implementation for single item data reader creator:

def reader_creator_random_image(width, height):
    def reader():
        while True:
            yield numpy.random.uniform(-1, 1, size=width*height)
return reader

An example implementation for multiple item data reader creator:

def reader_creator_random_image_and_label(width, height, label):
    def reader():
        while True:
            yield numpy.random.uniform(-1, 1, size=width*height), label
return reader
paddle.reader.cache(reader)[source]

Cache the reader data into memory.

Be careful that this method may take long time to process, and consume lots of memory. reader() would only call once.

Parameters

reader (generator) – a reader object which yields data each time.

Returns

a decorated reader object which yields data from cached memory.

Return type

generator

paddle.reader.map_readers(func, *readers)[source]

Creates a data reader that outputs return value of function using output of each data readers as arguments.

Parameters
  • func – function to use. The type of func should be (Sample) => Sample

  • readers – readers whose outputs will be used as arguments of func.

Type

callable

Returns

the created data reader.

Return type

callable

paddle.reader.buffered(reader, size)[source]

Creates a buffered data reader.

The buffered data reader will read and save data entries into a buffer. Reading from the buffered data reader will proceed as long as the buffer is not empty.

Parameters
  • reader (callable) – the data reader to read from.

  • size (int) – max buffer size.

Returns

the buffered data reader.

paddle.reader.compose(*readers, **kwargs)[source]

Creates a data reader whose output is the combination of input readers.

If input readers output following data entries: (1, 2) 3 (4, 5) The composed reader will output: (1, 2, 3, 4, 5)

Parameters
  • readers – readers that will be composed together.

  • check_alignment (bool) – if True, will check if input readers are aligned correctly. If False, will not check alignment and trailing outputs will be discarded. Defaults to True.

Returns

the new data reader.

Raises

ComposeNotAligned – outputs of readers are not aligned. Will not raise when check_alignment is set to False.

paddle.reader.chain(*readers)[source]

Creates a data reader whose output is the outputs of input data readers chained together.

If input readers output following data entries: [0, 0, 0] [1, 1, 1] [2, 2, 2] The chained reader will output: [0, 0, 0, 1, 1, 1, 2, 2, 2]

Parameters

readers – input readers.

Returns

the new data reader.

Return type

callable

paddle.reader.shuffle(reader, buf_size)[source]

Creates a data reader whose data output is shuffled.

Output from the iterator that created by original reader will be buffered into shuffle buffer, and then shuffled. The size of shuffle buffer is determined by argument buf_size.

Parameters
  • reader (callable) – the original reader whose output will be shuffled.

  • buf_size (int) – shuffle buffer size.

Returns

the new reader whose output is shuffled.

Return type

callable

exception paddle.reader.ComposeNotAligned[source]
paddle.reader.firstn(reader, n)[source]

Limit the max number of samples that reader could return.

Parameters
  • reader (callable) – the data reader to read from.

  • n (int) – the max number of samples that return.

Returns

the decorated reader.

Return type

callable

paddle.reader.xmap_readers(mapper, reader, process_num, buffer_size, order=False)[source]

Use multi-threads to map samples from reader by a mapper defined by user.

Parameters
  • mapper (callable) – a function to map the data from reader.

  • reader (callable) – a data reader which yields the data.

  • process_num (int) – thread number to handle original sample.

  • buffer_size (int) – size of the queue to read data in.

  • order (bool) – whether to keep the data order from original reader. Default False.

Returns

a decorated reader with data mapping.

Return type

callable

class paddle.reader.PipeReader(command, bufsize=8192, file_type='plain')[source]

PipeReader read data by stream from a command, take it’s stdout into a pipe buffer and redirect it to the parser to parse, then yield data as your desired format.

You can using standard linux command or call another program to read data, from HDFS, Ceph, URL, AWS S3 etc:

An example:

def example_reader():
    for f in myfiles:
        pr = PipeReader("cat %s"%f)
        for l in pr.get_line():
            sample = l.split(" ")
            yield sample
get_line(cut_lines=True, line_break='\n')
Parameters
  • cut_lines (bool) – cut buffer to lines

  • line_break (string) – line break of the file, like ‘\n’ or ‘\r’

Returns

one line or a buffer of bytes

Return type

string

paddle.reader.multiprocess_reader(readers, use_pipe=True, queue_size=1000)[source]

multiprocess_reader use python multi process to read data from readers and then use multiprocess.Queue or multiprocess.Pipe to merge all data. The process number is equal to the number of input readers, each process call one reader.

Multiprocess.Queue require the rw access right to /dev/shm, some platform does not support.

you need to create multiple readers first, these readers should be independent to each other so that each process can work independently.

An example:

reader0 = reader(["file01", "file02"])
reader1 = reader(["file11", "file12"])
reader1 = reader(["file21", "file22"])
reader = multiprocess_reader([reader0, reader1, reader2],
    queue_size=100, use_pipe=False)
class paddle.reader.Fake[source]

fake reader will cache the first data it read and yield it out for data_num times. It is used to cache a data from real reader and use it for speed testing.

Parameters
  • reader – the origin reader

  • data_num – times that this reader will yield data.

Returns

a fake reader.

Examples

def reader():
    for i in range(10):
        yield i

fake_reader = Fake()(reader, 100)

Creator package contains some simple reader creator, which could be used in user program.

paddle.reader.creator.np_array(x)[source]

Creates a reader that yields elements of x, if it is a numpy vector. Or rows of x, if it is a numpy matrix. Or any sub-hyperplane indexed by the highest dimension.

Parameters

x – the numpy array to create reader from.

Returns

data reader created from x.

paddle.reader.creator.text_file(path)[source]

Creates a data reader that outputs text line by line from given text file. Trailing new line (‘\n’) of each line will be removed.

Parameters

path (str) – path of the text file.

Returns

data reader of text file.

Return type

callable

paddle.reader.creator.recordio(paths, buf_size=100)[source]

Creates a data reader from given RecordIO file paths separated by “,”, glob pattern is supported.

Parameters
  • paths (str|list(str)) – path of recordio files.

  • buf_size (int) – prefetched buffer size.

Returns

data reader of recordio files.

Return type

callable