NVIDIA DALIを使ってみた(DALI+TensorFlow編)

NVIDIA DALIとTensorFlowを組み合わせる方法を解説します。

DALI単体での使用方法はこちらを参照してください。

f:id:xterm256color:20180709010520j:plain

目次

環境

インストール方法

pip install --extra-index-url https://developer.download.nvidia.com/compute/redist nvidia-dali

1. 準備

今回は入力としてTFRecord形式を試しました。 TensorFlowが配布しているFlower PhotosデータセットをダウンロードしてTFRecords化しておきます。

git clone https://github.com/chmod644/dali-example.git
cd dali-example
./download_and_preprocess_flowers.sh flowers

2. TFRecordのPipeline定義

以下の一連の処理を行うPipelineクラスを定義します。

  • JPEGデコード
  • リサイズ
  • ランダムクロップ
import tensorflow as tf

from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
import nvidia.dali.tfrecord as tfrec
import nvidia.dali.plugin.tf as dali_tf


class TFRecordPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, tfrecord, tfrecord_idx):
        super(TFRecordPipeline, self).__init__(batch_size,
                                         num_threads,
                                         device_id)
        self.input = ops.TFRecordReader(
            path=tfrecord,
            index_path=tfrecord_idx,
            features={"image/encoded" : tfrec.FixedLenFeature((), tfrec.string, ""),
                      'image/class/label': tfrec.FixedLenFeature([1], tfrec.int64,  -1),
                      'image/class/text': tfrec.FixedLenFeature([ ], tfrec.string, ''),
                      })
        self.decode = ops.nvJPEGDecoder(device = "mixed", output_type = types.RGB)
        self.resize = ops.Resize(device = "gpu", resize_a = 256, resize_b = 256)
        self.cmnp = ops.CropMirrorNormalize(device = "gpu",
                                            output_dtype = types.FLOAT,
                                            crop = (224, 224),
                                            image_type = types.RGB,
                                            mean = [0., 0., 0.],
                                            std = [1., 1., 1.],
                                            output_layout=types.NHWC)
        self.uniform = ops.Uniform(range = (0.0, 1.0))
        self.iter = 0

    def define_graph(self):
        inputs = self.input()
        images = self.decode(inputs["image/encoded"])
        resized_images = self.resize(images)
        output = self.cmnp(resized_images, crop_pos_x = self.uniform(),
                           crop_pos_y = self.uniform())
        return (output, inputs["image/class/label"].gpu())

    def iter_setup(self):
        pass

3. PipelineからTensorFlowオペレータへの変換

注意点:

  • tfrecord2idxというコマンドでidxファイルを生成する必要があります。
  • PipelineをGPU毎に実体化したあとでserialize()を呼ぶ必要があることに注意してください。
import os
from subprocess import call

def inputs_dali(batch_size, devices, tfrecord):
    tfrecord_idx = os.path.splitext(tfrecord)[0] + '.idx'
    tfrecord2idx_script = "tfrecord2idx"

    if not os.path.isfile(tfrecord_idx):
        call([tfrecord2idx_script, tfrecord, tfrecord_idx])

    pipes = [
        TFRecordPipeline(
            batch_size=batch_size, num_threads=2, device_id=device_id,
            tfrecord=FLAGS.tfrecord, tfrecord_idx=tfrecord_idx) for device_id
        in range(devices)]

    serialized_pipes = [pipe.serialize() for pipe in pipes]
    del pipes

    daliop = dali_tf.DALIIterator()

    images = []
    labels = []
    for d in range(devices):
        with tf.device('/gpu:%i' % d):
            image, label = daliop(serialized_pipeline=serialized_pipes[d],
                                  batch_size=batch_size,
                                  height=224,
                                  width=224,
                                  device_id=d)
            images.append(image)
            labels.append(label)

    return images, labels

4. TensorFlowからの呼び出し

上で定義したinput_dali()の返り値はTensorFlowのTensor型(のリスト)になっているので、いつも通りsess.run()で評価することが出来ます。

import tensorflow as tf
import numpy as np
import time

# バッチサイズ
BATCH_SIZE = 16
# GPU枚数
DEVICES = 1
# TFRecordへのパス
PATH_TO_TFRECORD="./flowers/train-00000-of-00002" 


images, labels = inputs_dali(batch_size=batch_size, devices=devices, tfrecord=PATH_TO_TFRECORD)

gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.8)
config = tf.ConfigProto(allow_soft_placement=True, gpu_options=gpu_options)

with tf.Session(config=config) as sess:
    all_img_per_sec = []
    total_batch_size = BATCH_SIZE * DEVICES

    for i in range(ITERATIONS):
            start_time = time.time()

            # The actual run with our dali_tf tensors
            images_val, labels_val = sess.run([images, labels])

            elapsed_time = time.time() - start_time
            img_per_sec = total_batch_size / elapsed_time
            if i > BURNIN_STEPS:
                all_img_per_sec.append(img_per_sec)
                print("\t%7.1f img/s" % img_per_sec)

        print("Total average %7.1f img/s" % (sum(all_img_per_sec) / len(all_img_per_sec)))

DALI使用時・未使用時の速度比較

こちらのコードを使って、 DALIを使った場合とTensorFlowオペレータを使った場合を比較しました。

DALIがあるとき

Total average  1974.7 img/s

DALIがないとき*1

Total average   322.0 img/s

DALIを使うと6.13倍くらい速くなっています。DALIがないときのコードは適当に書いたのでもう少し速くなるかもしれませんが、それでも圧倒的にDALIが速いでしょう。

未解決・未検証の点

そのほか以下の点は執筆時点で未解決・未検証です。

  • ときどき CUDA error "out of memory" で落ちる → バッチサイズ小さくしないといけないかも。TensorFlowの下層側の実行速度やネットワークサイズに影響ありそうで怖い。
  • ラベルがfloatとして返ってくる→ issueにあげました
  • NCHWのときにTensorのshapeがNHWCになる → issueにあげました

参考URL

*1:tf.data.TFRecordDataset+tf.image.decode_jpeg+tf.image.resize_images+tf.random_cropを使いました