python-tensorflow Coordinate和QueueRunner配合管理队列-提取数据
发布日期:2021-11-05 07:52:18 浏览次数:35 分类:技术文章

本文共 5324 字,大约阅读时间需要 17 分钟。

tf.QueueRunner主要用于启动多个线程来操作同一个队列,启动的这些线程可以通过上面介绍的tf.Coordinator类来统一管理。以下代码展示如何使用tf.QueueRunner和tf.Coordinator来管理多线程队列操作。

第一种情况,直接从FIFOQueue队列中dequeue()数据

import tensorflow as tf # 声明一个先进先出的队列,队列中最多100个元素,类型为实数queue = tf .FIFOQueue(100, 'float')# 定义队列的入队操作enqueue_op = queue.enqueue([tf.random_normal([1])])# 使用 tf.train.QueueRunner来创建多个线程运行队列的入队操作# tf.train.QueueRunner给出了被操作的队列,[enqueue_op] * 5# 表示了需要启动5个线程,每个线程中运行的是enqueue_op操作qr = tf.train.QueueRunner(queue, [enqueue_op] * 5)# 将定义过的QueueRunner加入TensorFlow计算图上指定的集合# tf.train.add_queue_runner函数没有指定集合,# 则加入默认集合tf.GraphKeys.QUEUE_RUNNERS。# 下面的函数就是将刚刚定义的qr加入默认的tf.GraphKeys.QUEUE_RUNNERS结合tf.train.add_queue_runner(qr)# 定义出队操作out_tensor = queue.dequeue()with tf.Session() as sess:    # 使用tf.train.Coordinator来协同启动的线程    coord = tf.train.Coordinator()    # 使用tf.train.QueueRunner时,需要明确调用tf.train.start_queue_runners    # 来启动所有线程。否则因为没有线程运行入队操作,当调用出队操作时,程序一直等待    # 入队操作被运行。tf.train.start_queue_runners函数会默认启动    # tf.GraphKeys.QUEUE_RUNNERS中所有QueueRunner.因为这个函数只支持启动指定集合中的QueueRunner,    # 所以一般来说tf.train.add_queue_runner函数和tf.train.start_queue_runners函数会指定同一个结合    threads = tf.train.start_queue_runners(sess=sess, coord=coord)    # 获取队列中的取值    for _ in range(3): print(sess.run(out_tensor)[0])    # 使用tf.train.Coordinator来停止所有线程    coord.request_stop()    # 等待所有线程执行完毕    coord.join(threads)

-0.85880387

-0.97668344

2.1963

 

第二个例子

import tensorflow as tfimport threadingimport timequeue = tf.FIFOQueue(10, dtypes=[tf.float32])y = tf.Variable(initial_value=1.0, dtype=tf.float32)y_assign = tf.assign_add(y, 1)with tf.control_dependencies([y_assign]):    enque_op = queue.enqueue([y])# 声明创建5个线程,将线程管理器送到线程中deque_op = queue.dequeue()with tf.Session() as sess:    # 声明一个tf.train.Coordinator类来协同多个线程    coord = tf.train.Coordinator()    sess.run(tf.global_variables_initializer())    # threads = [threading.Thread(target=enque_data, args=(sess, enque_op, coord, id)) for id in range(5)]    # 创建包含1个线程的QueueRunner    qr = tf.train.QueueRunner(queue, [enque_op]*1)    threads = qr.create_threads(sess,start=True,coord=coord)    for i in range(10):        print(sess.run(deque_op))    coord.request_stop()    coord.join(threads)

输出:

 3.0

6.0
7.0
9.0
10.0
11.0
14.0
16.0
18.0
21.0

第二种情况,通过线程,批量从FIFOQueue队列中提取数据

只是在一的前提下,加上了

out_tensor = tf.train.batch(deque_op,batch_size=2,num_threads=5,capacity=64,allow_smaller_final_batch=False),

tf.train.batch是一个tensor队列生成器,作用是按照给定的tensor顺序,把batch_size个tensor推送到文件队列,作为训练一个batch的数据,等待tensor出队执行计算。

另外,tf.train.batch要求deque_op为可迭代对象,这里是列表

import tensorflow as tf # 声明一个先进先出的队列,队列中最多100个元素,类型为实数queue = tf .FIFOQueue(100, dtypes=(tf.float32,tf.float32),shapes=((1,),(1,)))# 定义队列的入队操作enqueue_op = queue.enqueue_many([[tf.random_normal([1])],[tf.random_normal([1])]])# 使用 tf.train.QueueRunner来创建多个线程运行队列的入队操作# tf.train.QueueRunner给出了被操作的队列,[enqueue_op] * 5# 表示了需要启动5个线程,每个线程中运行的是enqueue_op操作qr = tf.train.QueueRunner(queue, [enqueue_op] * 5)# 将定义过的QueueRunner加入TensorFlow计算图上指定的集合# tf.train.add_queue_runner函数没有指定集合,# 则加入默认集合tf.GraphKeys.QUEUE_RUNNERS。# 下面的函数就是将刚刚定义的qr加入默认的tf.GraphKeys.QUEUE_RUNNERS结合tf.train.add_queue_runner(qr)# 定义出队操作deque_op = queue.dequeue()# 下面想从队列中获取数据,但是不能从queue中直接获取,queue是队列,不可迭代out_tensor = tf.train.batch(deque_op,batch_size=2,num_threads=5,capacity=64,allow_smaller_final_batch=False) with tf.Session() as sess:    # 使用tf.train.Coordinator来协同启动的线程    coord = tf.train.Coordinator()    # 使用tf.train.QueueRunner时,需要明确调用tf.train.start_queue_runners    # 来启动所有线程。否则因为没有线程运行入队操作,当调用出队操作时,程序一直等待    # 入队操作被运行。tf.train.start_queue_runners函数会默认启动    # tf.GraphKeys.QUEUE_RUNNERS中所有QueueRunner.因为这个函数只支持启动指定集合中的QueueRunner,    # 所以一般来说tf.train.add_queue_runner函数和tf.train.start_queue_runners函数会指定同一个结合    threads = tf.train.start_queue_runners(sess=sess, coord=coord)    # 获取队列中的取值    for _ in range(1): print(sess.run(out_tensor))     # 使用tf.train.Coordinator来停止所有线程    coord.request_stop()    # 等待所有线程执行完毕    coord.join(threads)
[array([[-1.1871843],       [-0.4869249]], dtype=float32), array([[0.46282652],       [0.8103049 ]], dtype=float32)]

第三种情况,通过线程,批量从文件队列中提取数据 

slice_input_producer从tensor列表=====》读取数据到文件名队列

tf.train.batch从文件名队列====》读取数据到文件队列

import tensorflow as tf images = ['img1', 'img2', 'img3', 'img4', 'img5']labels= [1,2,3,4,5]epoch_num=8#构建文件名队列input_queue = tf.train.slice_input_producer([images, labels],num_epochs=None,shuffle=False) #images labels是tensor列表#构建文件队列,将文件名队列中数据提取到文件队列images_batch,labels_batch = tf.train.batch(input_queue,batch_size=2,num_threads=2,capacity=5) with tf.Session() as sess:    sess.run(tf.global_variables_initializer())        #启动线程管理器    coord = tf.train.Coordinator()    #启动文件名队列线程    threads = tf.train.start_queue_runners(sess=sess, coord=coord)        for i in range(epoch_num):        images_out,labels_out = sess.run([images_batch,labels_batch])        print('************************')        print (i,images_out,labels_out)     coord.request_stop()    coord.join(threads)
************************0 [b'img1' b'img2'] [1 2]************************1 [b'img3' b'img4'] [3 4]************************2 [b'img5' b'img1'] [5 1]************************3 [b'img2' b'img3'] [2 3]************************4 [b'img4' b'img5'] [4 5]************************5 [b'img1' b'img2'] [1 2]************************6 [b'img3' b'img4'] [3 4]************************7 [b'img5' b'img1'] [5 1]

转载地址:https://blog.csdn.net/xingghaoyuxitong/article/details/114656148 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:python-tensorflow Coordinate和QueueRunner配合管理队列-注入数据
下一篇:python-tensorflow Coordinator管理多线程

发表评论

最新留言

哈哈,博客排版真的漂亮呢~
[***.90.31.176]2024年04月06日 09时35分34秒