python-tensorflow Coordinate和QueueRunner配合管理队列-提取数据
发布日期:2021-11-05 07:52:18
浏览次数:35
分类:技术文章
本文共 5324 字,大约阅读时间需要 17 分钟。
tf.QueueRunner主要用于启动多个线程来操作同一个队列,启动的这些线程可以通过上面介绍的tf.Coordinator类来统一管理。以下代码展示如何使用tf.QueueRunner和tf.Coordinator来管理多线程队列操作。
第一种情况,直接从FIFOQueue队列中dequeue()数据
import tensorflow as tf # 声明一个先进先出的队列,队列中最多100个元素,类型为实数queue = tf .FIFOQueue(100, 'float')# 定义队列的入队操作enqueue_op = queue.enqueue([tf.random_normal([1])])# 使用 tf.train.QueueRunner来创建多个线程运行队列的入队操作# tf.train.QueueRunner给出了被操作的队列,[enqueue_op] * 5# 表示了需要启动5个线程,每个线程中运行的是enqueue_op操作qr = tf.train.QueueRunner(queue, [enqueue_op] * 5)# 将定义过的QueueRunner加入TensorFlow计算图上指定的集合# tf.train.add_queue_runner函数没有指定集合,# 则加入默认集合tf.GraphKeys.QUEUE_RUNNERS。# 下面的函数就是将刚刚定义的qr加入默认的tf.GraphKeys.QUEUE_RUNNERS结合tf.train.add_queue_runner(qr)# 定义出队操作out_tensor = queue.dequeue()with tf.Session() as sess: # 使用tf.train.Coordinator来协同启动的线程 coord = tf.train.Coordinator() # 使用tf.train.QueueRunner时,需要明确调用tf.train.start_queue_runners # 来启动所有线程。否则因为没有线程运行入队操作,当调用出队操作时,程序一直等待 # 入队操作被运行。tf.train.start_queue_runners函数会默认启动 # tf.GraphKeys.QUEUE_RUNNERS中所有QueueRunner.因为这个函数只支持启动指定集合中的QueueRunner, # 所以一般来说tf.train.add_queue_runner函数和tf.train.start_queue_runners函数会指定同一个结合 threads = tf.train.start_queue_runners(sess=sess, coord=coord) # 获取队列中的取值 for _ in range(3): print(sess.run(out_tensor)[0]) # 使用tf.train.Coordinator来停止所有线程 coord.request_stop() # 等待所有线程执行完毕 coord.join(threads)
-0.85880387
-0.97668344
2.1963
第二个例子
import tensorflow as tfimport threadingimport timequeue = tf.FIFOQueue(10, dtypes=[tf.float32])y = tf.Variable(initial_value=1.0, dtype=tf.float32)y_assign = tf.assign_add(y, 1)with tf.control_dependencies([y_assign]): enque_op = queue.enqueue([y])# 声明创建5个线程,将线程管理器送到线程中deque_op = queue.dequeue()with tf.Session() as sess: # 声明一个tf.train.Coordinator类来协同多个线程 coord = tf.train.Coordinator() sess.run(tf.global_variables_initializer()) # threads = [threading.Thread(target=enque_data, args=(sess, enque_op, coord, id)) for id in range(5)] # 创建包含1个线程的QueueRunner qr = tf.train.QueueRunner(queue, [enque_op]*1) threads = qr.create_threads(sess,start=True,coord=coord) for i in range(10): print(sess.run(deque_op)) coord.request_stop() coord.join(threads)
输出:
3.0
6.0 7.0 9.0 10.0 11.0 14.0 16.0 18.0 21.0第二种情况,通过线程,批量从FIFOQueue队列中提取数据
只是在一的前提下,加上了
out_tensor = tf.train.batch(deque_op,batch_size=2,num_threads=5,capacity=64,allow_smaller_final_batch=False),
tf.train.batch是一个tensor队列生成器,作用是按照给定的tensor顺序,把batch_size个tensor推送到文件队列,作为训练一个batch的数据,等待tensor出队执行计算。
另外,tf.train.batch要求deque_op为可迭代对象,这里是列表import tensorflow as tf # 声明一个先进先出的队列,队列中最多100个元素,类型为实数queue = tf .FIFOQueue(100, dtypes=(tf.float32,tf.float32),shapes=((1,),(1,)))# 定义队列的入队操作enqueue_op = queue.enqueue_many([[tf.random_normal([1])],[tf.random_normal([1])]])# 使用 tf.train.QueueRunner来创建多个线程运行队列的入队操作# tf.train.QueueRunner给出了被操作的队列,[enqueue_op] * 5# 表示了需要启动5个线程,每个线程中运行的是enqueue_op操作qr = tf.train.QueueRunner(queue, [enqueue_op] * 5)# 将定义过的QueueRunner加入TensorFlow计算图上指定的集合# tf.train.add_queue_runner函数没有指定集合,# 则加入默认集合tf.GraphKeys.QUEUE_RUNNERS。# 下面的函数就是将刚刚定义的qr加入默认的tf.GraphKeys.QUEUE_RUNNERS结合tf.train.add_queue_runner(qr)# 定义出队操作deque_op = queue.dequeue()# 下面想从队列中获取数据,但是不能从queue中直接获取,queue是队列,不可迭代out_tensor = tf.train.batch(deque_op,batch_size=2,num_threads=5,capacity=64,allow_smaller_final_batch=False) with tf.Session() as sess: # 使用tf.train.Coordinator来协同启动的线程 coord = tf.train.Coordinator() # 使用tf.train.QueueRunner时,需要明确调用tf.train.start_queue_runners # 来启动所有线程。否则因为没有线程运行入队操作,当调用出队操作时,程序一直等待 # 入队操作被运行。tf.train.start_queue_runners函数会默认启动 # tf.GraphKeys.QUEUE_RUNNERS中所有QueueRunner.因为这个函数只支持启动指定集合中的QueueRunner, # 所以一般来说tf.train.add_queue_runner函数和tf.train.start_queue_runners函数会指定同一个结合 threads = tf.train.start_queue_runners(sess=sess, coord=coord) # 获取队列中的取值 for _ in range(1): print(sess.run(out_tensor)) # 使用tf.train.Coordinator来停止所有线程 coord.request_stop() # 等待所有线程执行完毕 coord.join(threads)
[array([[-1.1871843], [-0.4869249]], dtype=float32), array([[0.46282652], [0.8103049 ]], dtype=float32)]
第三种情况,通过线程,批量从文件队列中提取数据
slice_input_producer从tensor列表=====》读取数据到文件名队列
tf.train.batch从文件名队列====》读取数据到文件队列
import tensorflow as tf images = ['img1', 'img2', 'img3', 'img4', 'img5']labels= [1,2,3,4,5]epoch_num=8#构建文件名队列input_queue = tf.train.slice_input_producer([images, labels],num_epochs=None,shuffle=False) #images labels是tensor列表#构建文件队列,将文件名队列中数据提取到文件队列images_batch,labels_batch = tf.train.batch(input_queue,batch_size=2,num_threads=2,capacity=5) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) #启动线程管理器 coord = tf.train.Coordinator() #启动文件名队列线程 threads = tf.train.start_queue_runners(sess=sess, coord=coord) for i in range(epoch_num): images_out,labels_out = sess.run([images_batch,labels_batch]) print('************************') print (i,images_out,labels_out) coord.request_stop() coord.join(threads)
************************0 [b'img1' b'img2'] [1 2]************************1 [b'img3' b'img4'] [3 4]************************2 [b'img5' b'img1'] [5 1]************************3 [b'img2' b'img3'] [2 3]************************4 [b'img4' b'img5'] [4 5]************************5 [b'img1' b'img2'] [1 2]************************6 [b'img3' b'img4'] [3 4]************************7 [b'img5' b'img1'] [5 1]
转载地址:https://blog.csdn.net/xingghaoyuxitong/article/details/114656148 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
哈哈,博客排版真的漂亮呢~
[***.90.31.176]2024年04月06日 09时35分34秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
Kafka实战(七) - 优雅地部署 Kafka 集群
2019-04-27
Java支付系统(三) - SpringBoot 应用程序搭建
2019-04-27
详解Java业务领域分层模型中的vo/po/dto/pojo/bo
2019-04-27
Java持久层框架MyBatis全注解详解
2019-04-27
Java线程组ThreadGroup
2019-04-27
Java同步器之AbstractOwnableSynchronizer详解
2019-04-27
为什么需要学习并发编程?
2019-04-27
Java计算机IT编程文档常见单词翻译
2019-04-27
Java协作中断机制
2019-04-27
MySQL8.0数据库基础教程(二)-理解"关系"
2019-04-27
2020年最新阿里Java面试题,看看你都会了吗?
2019-04-27
大厂业务开发面试必问的UML你都会了吗?
2019-04-27
MySQL8.0关系数据库基础教程(三)-select语句详解
2019-04-27
JVM参数调优基础-参数的类型详解
2019-04-27
大厂都这么使用MySQL8进行条件查询
2019-04-27
SpringCloud微服务实战(十一)-微服务网关及其实现原理(Zuul为例讲解)
2019-04-27
Java程序员求职热点问题总结(持续更新)
2019-04-27
数据结构与算法(一): 动态数组
2019-04-27