当前位置 : 主页 > 编程语言 > python >

Tensorflow 多线程设置方式

来源:互联网 收集:自由互联 发布时间:2021-04-09
一. 通过 ConfigProto 设置多线程 (具体参数功能及描述见 tensorflow/core/protobuf/config.proto) 在进行 tf.ConfigProto() 初始化时,可以通过设置相应的参数,来控制每个操作符 op 并行计算的线程

一. 通过 ConfigProto 设置多线程

(具体参数功能及描述见 tensorflow/core/protobuf/config.proto)

在进行 tf.ConfigProto() 初始化时,可以通过设置相应的参数,来控制每个操作符 op 并行计算的线程个数或 session 线程池的线程数。主要涉及的参数有以下三个:

1. intra_op_parallelism_threads 控制运算符op内部的并行
当运算符 op 为单一运算符,并且内部可以实现并行时,如矩阵乘法,reduce_sum 之类的操作,可以通过设置 intra_op_parallelism_threads 参数来并行。

2. inter_op_parallelism_threads 控制多个运算符op之间的并行计算
当有多个运算符 op,并且他们之间比较独立,运算符和运算符之间没有直接的路径 Path 相连。Tensorflow会尝试并行地计算他们,使用由 inter_op_parallelism_threads 参数来控制数量的一个线程池。
在第一次创建会话将设置将来所有会话的线程数,除非是配置了 session_inter_op_thread_pool 选项。

3. session_inter_op_thread_pool 配置会话线程池。
如果会话线程池的 num_threads 为 0,使用 inter_op_parallelism_threads 选项。

二. 通过队列进行数据读取时设置多线程

(具体函数功能及描述见 tensorflow/python/training/input.py)

1. 通过以下函数进行样本批处理时,可以通过设置 num_threads 来设置单个 Reader 多线程读取

1) batch(tensors, batch_size, num_threads=1, capacity=32,
enqueue_many=False, shapes=None, dynamic_pad=False,
allow_smaller_final_batch=False, shared_name=None, name=None)

2) maybe_batch(tensors, keep_input, batch_size, num_threads=1, capacity=32,
enqueue_many=False, shapes=None, dynamic_pad=False,
allow_smaller_final_batch=False, shared_name=None, name=None)

3) shuffle_batch(tensors, batch_size, capacity, min_after_dequeue,
num_threads=1, seed=None, enqueue_many=False, shapes=None,
allow_smaller_final_batch=False, shared_name=None, name=None)

4) maybe_shuffle_batch(tensors, batch_size, capacity, min_after_dequeue,
keep_input, num_threads=1, seed=None,
enqueue_many=False, shapes=None,
allow_smaller_final_batch=False, shared_name=None,
name=None)

例:

import tensorflow as tf 
 
 
filenames = ['A.csv', 'B.csv', 'C.csv'] 
# 生成一个先入先出队列和一个 QueueRunner,生成文件名队列 
filename_queue = tf.train.string_input_producer(filenames, shuffle=False) 
 
 
# 定义 Reader 和 Decoder
reader = tf.TextLineReader() 
key, value = reader.read(filename_queue) 
example, label = tf.decode_csv(value, record_defaults=[['null'], ['null']])
 
 
# 使用tf.train.batch() 会为 graph 添加一个样本队列和一个 QueueRunner。 
# 经过 Reader 读取文件和 Decoder 解码后数据会进入这个队列,再批量出队。
# tf.train.batch() 这里只有一个 Reader,可以设置多线程
 
example_batch, label_batch = tf.train.batch([example, label], batch_size=5) 
 
 
with tf.Session() as sess: 
  coord = tf.train.Coordinator() 
  threads = tf.train.start_queue_runners(coord=coord) 
  for i in range(10): 
    e_val,l_val = sess.run([example_batch,label_batch]) 
    print e_val,l_val 
  coord.request_stop() 
  coord.join(threads) 

2. 通过以下函数进行样本批处理时,可以通过设置 Decoder 和 Reader 的个数来设置多 Reader 读取,其中每个 Reader 使用一个线程

1) batch_join(tensors_list, batch_size, capacity=32, enqueue_many=False,
shapes=None, dynamic_pad=False, allow_smaller_final_batch=False,
shared_name=None, name=None):

2) maybe_batch_join(tensors_list, keep_input, batch_size, capacity=32,
enqueue_many=False, shapes=None, dynamic_pad=False,
allow_smaller_final_batch=False, shared_name=None,
name=None)

3) shuffle_batch_join(tensors_list, batch_size, capacity,
min_after_dequeue, seed=None, enqueue_many=False,
shapes=None, allow_smaller_final_batch=False,
shared_name=None, name=None)

4) maybe_shuffle_batch_join(tensors_list, batch_size, capacity,
min_after_dequeue, keep_input, seed=None,
enqueue_many=False, shapes=None,
allow_smaller_final_batch=False, shared_name=None,
name=None)

例:

import tensorflow as tf 
 
 
filenames = ['A.csv', 'B.csv', 'C.csv'] 
# 生成一个先入先出队列和一个 QueueRunner,生成文件名队列
filename_queue = tf.train.string_input_producer(filenames, shuffle=False) 
 
 
# 定义 Reader
reader = tf.TextLineReader() 
key, value = reader.read(filename_queue) 
 
 
#定义了多个 Decoder, 每个 Decoder 跟一个 Reader 相连, 即有多个 Reader
example_list = [tf.decode_csv(value, record_defaults=[['null'], ['null']])
         for _ in range(2)] # Decoder 和 Reader 为 2
   
# 使用tf.train.batch_join() 会为 graph 添加一个样本队列和一个 QueueRunner。 
# 经过多个 Reader 读取文件和 Decoder 解码后数据会进入这个队列,再批量出队。   
# 使用 tf.train.batch_join(), 可以使用多个 Reader 并行读取数据。每个 Reader 使用一个线程
example_batch, label_batch = tf.train.batch_join(example_list, batch_size=5) 
with tf.Session() as sess: 
  coord = tf.train.Coordinator() 
  threads = tf.train.start_queue_runners(coord=coord) 
  for i in range(10): 
    e_val,l_val = sess.run([example_batch,label_batch]) 
    print e_val,l_val 
  coord.request_stop() 
  coord.join(threads) 

以上这篇Tensorflow 多线程设置方式就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持易盾网络。

网友评论