屈天航

屈天航 查看完整档案

北京编辑重庆大学  |  计算机科学与技术 编辑融360  |  高级后端研发工程师 编辑 segmentfault.com/u/qutianhang 编辑
编辑

后端研发工程师(python、golang),专注性能优化、架构设计、微服务等

个人动态

屈天航 关注了专栏 · 1月16日

技术风暴

关注公众号「关山不难越」学习更多前端进阶知识。 Classical is something not fade,but grow more precious with time pass by,so is dream id dream.

关注 5136

屈天航 关注了专栏 · 1月16日

前端开发那些事儿

前端知识:HTML、CSS、JS、React,nodejs、Chrome、数据结构与算法,计算机网络等精华知识分享交流。

关注 6233

屈天航 关注了专栏 · 1月16日

SegmentFault 行业快讯

第一时间为开发者提供行业相关的实时热点资讯

关注 55180

屈天航 关注了用户 · 1月16日

敖丙 @aobing

关注 5795

屈天航 关注了专栏 · 1月16日

SegmentFault 思否观察

SegmentFault 思否对开发者行业的洞见、观察与报道

关注 26737

屈天航 关注了专栏 · 1月16日

程序员哆啦A梦

达达前端技术社群:囊括前端Vue、JavaScript、数据结构与算法、实战演练、Node全栈一线技术,紧跟业界发展步伐,一个热爱前端的达达程序员。

关注 10212

屈天航 关注了专栏 · 1月16日

前端自习课

pingan8787前端开发学习记录

关注 910

屈天航 关注了专栏 · 1月16日

CodeGuide | 程序员编码指南

公众号:bugstack虫洞栈,回复:设计模式,可以下载《重学Java设计模式》PDF,全网下载量17万+ | 这是一本互联网真实案例实践书籍。以落地解决方案为核心,从实际业务中抽离出,交易、营销、秒杀、中间件、源码等22个真实场景,来学习设计模式的运用。

关注 13006

屈天航 关注了专栏 · 1月16日

思否编程 技术分享

思否编程技术内容分享

关注 6503

屈天航 关注了专栏 · 1月16日

vivo 互联网技术

分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。

关注 6531

屈天航 关注了专栏 · 1月16日

我的Android开源之旅

微信公众号:我的Android开源之旅。 四年工作经验,三年架构经验,五年Github开源经验, 擅长设计模式,架构设计,移动设备通讯。热爱技术,常年活跃在各大移动开发社区,对前沿技术保持高度的学习和关注。 目前在Github平台上Java语言世界排名第400名:http://git-awards.com/users/search?login=xuexiangjys

关注 2500

屈天航 关注了专栏 · 1月16日

进击的大前端

前端工程师,底层技术人。 思否2020年度“Top Writer”! 掘金“优秀作者”! 开源中国2020年度“优秀源创作者” 分享各种大前端进阶知识! 关注公众号【进击的大前端】第一时间获取高质量原创。 更多文章和示例源码请看:https://github.com/dennis-jiang/Front-End-Knowledges

关注 10490

屈天航 关注了用户 · 2020-11-23

小小小小雨 @xiaoxiaoxiaoxiaoyu

关注 1

屈天航 发布了文章 · 2020-11-20

聊一聊python和golang协程的区别

背景

最近在做后端服务python到go的迁移和重构,这两种语言里,最大的特色和优势就是都支持协程。之前主要做python的性能优化和架构优化,一开始觉得两个协程原理和应用应该差不多,后来发现还是有很大的区别,今天就在这里总结一下。

什么是协程

在说它们两者区别前,我们首先聊一下什么是协程,好像它没有一个官方的定义,那就结合平时的应用经验和学习内容来谈谈自己的理解。

协程,其实可以理解为一种特殊的程序调用。特殊的是在执行过程中,在子程序(或者说函数)内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。
注意,它有两个特征:

可中断,这里的中断不是普通的函数调用,而是类似CPU的中断,CPU在这里直接释放转到其他程序断点继续执行。
可恢复,等到合适的时候,可以恢复到中断的地方继续执行,至于什么是合适的时候,我们后面再探讨。

和进程线程的区别

上面两个特点就导致了它相对于线程和进程切换来说极高的执行效率,为什么这么说呢?我们先老生常谈地说一下进程和线程。

进程是操作系统资源分配的基本单位,线程是操作系统调度和执行的最小单位。这两句应该是我们最常听到的两句话,拆开来说,进程是程序的启动实例,拥有代码和打开的文件资源、数据资源、独立的内存空间。线程从属于进程,是程序的实际执行者,一个进程至少包含一个主线程,也可以有更多的子线程,线程拥有自己的栈空间。无论是进程还是线程,都是由操作系统所管理和切换的。

我们再来看协程,它又叫做微线程,但其实它和进程还有线程完全不是一个维度上的概念。进程和线程的切换完全是用户无感,由操作系统控制,从用户态到内核态再到用户态。而协程的切换完全是程序代码控制的,在用户态的切换,就像函数回调的消耗一样,在线程的栈内即可完成。

python的协程(coroutine)

python的协程其实是我们通常意义上的协程coroutine。

从概念上来讲,python的协程同样是在适当的时候可中断可恢复。那么什么是适当的时候呢,就是你认为适当的时候,因为程序在哪里发生协程切换完全控制在开发者手里。当然,对于python来说,由于GIL锁,在CPU密集的代码上做协程切换是没啥意义的,CPU本来就在忙着没偷懒,切换到其他协程,也只是在单核内换个地方忙而已。很明显,我们应该在IO密集的地方来起协程,这样可以让CPU不再空等转而去别的地方干活,才能真正发挥协程的威力。

从实现上来讲,如果熟知了python生成器,还可以将协程理解为生成器+调度策略,生成器中的yield关键字,就可以让生成器函数发生中断,而调度策略,可以驱动着协程的执行和恢复。这样就实现了协程的概念。这里的调度策略可能有很多种,简单的例如忙轮循:while True,更简单的甚至是一个for循环。就可以驱动生成器的运行,因为生成器本身也是可迭代的。复杂的比如可能是基于epool的事件循环,在python2的tornado中,以及python3的asyncio中,都对协程的用法做了更好的封装,通过yield和await就可以使用协程,通过事件循环监控文件描述符状态来驱动协程恢复执行。

我们看一个简单的协程:

import time

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        time.sleep(1)
        r = '200 OK'
def produce(c):
    c.next()
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

if __name__=='__main__':
    c = consumer()
    produce(c)

很明显这是一个传统的生产者-消费者模型,这里consumer函数就是一个协程(生成器),它在n = yield r 的地方发生中断,生产者produce中的c.send(n),可以驱动协程的恢复,并且向协程函数传递数据n,接收返回结果r。 而while n < 5,就是我们所说的调度策略。 在生产中,这种模式很适合我们来做一些pipeline数据的消费,我们不需要写死几个生产者进程几个消费者进程,而是用这种协程的方式,来实现CPU动态地分配调度。

如果你看过上篇文章的话,是不是发现这个golang中流水线模型有点像呢,也是生产者和消费者间进行通信,但go是通过channel这种安全的数据结构,为什么python不需要呢,因为python的协程是在单线程内切换本身就是安全的,换句话说,协程间本身就是串行执行的。而golang则不然。思考一个有意思的问题,如果我们将go流水线模型中channel设置为无缓冲区时,生产者绝对驱动消费者的执行,是不是就跟python很像了呢。所以python的协程从某种意义来说,是不是golang协程的一种特殊情况呢?
后端在线服务中我们更常用的python协程其实是在异步IO框架中使用,之前我们也提过python协程在IO密集的系统中使用才能发挥它的威力。并且大多数的数据中间件都已经提供支持了异步包的支持,这里顺便贴一个python3支持的异步IO库,基本支持了常见的异步数据中间件。

再看一个代码片段,asyncio支持的原生协程:
asyncio支持的基于epool的事件循环:

def main():
    define_options()
    options.parse_command_line()
    # 使用uvloop代替原生事件循环
    # asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    app = tornado.web.Application(handlers=handlers, debug=options.debug)
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    asyncio.get_event_loop().run_forever()

async/await支持的原生协程:

class CreditHandler(BaseHandler):
    async def post(self):
        status, msg, user = self.check_args('uid', 'order_no', 'phone', 'name',
                                            'product_id')
        if status != ErrorCodeConfig.SUCCESS:
            status, msg, report = status, msg, None
        else:                                                                              RcOutputFlowControler())
            status, msg, report = await rcoutput_flow_instance.get_rcoutput_result(user)
        res = self.generate_response_data(status, msg, report)
        await self.finish(res)
      
        await AccompanyRunningFlowControler().get_accompany_data(user)

总结一下python协程的特点:

单线程内切换,适用于IO密集型程序中,可以最大化IO多路复用的效果。
无法利用多核。
协程间完全同步,不会并行。不需要考虑数据安全。
用法多样,可以用在web服务中,也可用在pipeline数据/任务消费中

golang的协程(goroutine)

golang的协程就和传统意义上的协程不大一样了,兼具协程和线程的优势。这也是go最大的特色,就是从语言层面支持并发。Go语言里,启动一个goroutine很容易:go function 就行。

同样从概念上来讲,golang的协程同样是在适当的时候可中断可恢复。当协程中发生channel读写的阻塞或者系统调用时,就会切换到其他协程。具体的代码示例可以看上篇文章,就不再赘述了。

从实现上来说,goroutine可以在多核上运行,从而实现协程并行,我们先直接看下go的调度模型MPG。

image.png

如上图,M指的是Machine,一个M直接关联了一个内核线程。由操作系统管理。
P指的是”processor”,代表了M所需的上下文环境,也是处理用户级代码逻辑的处理器。它负责衔接M和G的调度上下文,将等待执行的G与M对接。
G指的是Goroutine,其实本质上也是一种轻量级的线程。包括了调用栈,重要的调度信息,例如channel等。

每次go调用的时候,都会:

  1. 创建一个G对象,加入到本地队列或者全局队列
  2. 如果还有空闲的P,则创建一个M
  3. M会启动一个底层线程,循环执行能找到的G任务
  4. G任务的执行顺序是,先从本地队列找,本地没有则从全局队列找(一次性转移(全局G个数/P个数)个,再去其它P中找(一次性转移一半)

对于上面的第2-3步,创建一个M,其过程:

  1. 先找到一个空闲的P,如果没有则直接返回,(哈哈,这个地方就保证了进程不会占用超过自己设定的cpu个数)
  2. 调用系统api创建线程,不同的操作系统,调用不一样,其实就是和c语言创建过程是一致的
  3. 然后创建的这个线程里面才是真正做事的,循环执行G任务

当协程发生阻塞切换时:

  1. M0出让P
  2. 创建M1接管P及其任务队列继续执行其他G。
  3. 当阻塞结束后,M0会尝试获取空闲的P,失败的话,就把当前的G放到全局队列的队尾。

这里我们需要注意三点:
1、M与P的数量没有绝对关系,一个M阻塞,P就会去创建或者切换另一个M,所以,即使P的默认数量是1,也有可能会创建很多个M出来。

2、P何时创建:在确定了P的最大数量n后,运行时系统会根据这个数量创建n个P。

3、M何时创建:没有足够的M来关联P并运行其中的可运行的G。比如所有的M此时都阻塞住了,而P中还有很多就绪任务,就会去寻找空闲的M,而没有空闲的,就会去创建新的M。

总结一下go协程的特点:

协程间需要保证数据安全,比如通过channel或锁。
可以利用多核并行执行。
协程间不完全同步,可以并行运行,具体要看channel的设计。
抢占式调度,可能无法实现公平。

coroutine(python)和goroutine(go)的区别

除了python,C#, Lua语言都支持 coroutine 特性。coroutine 与 goroutine 在名字上类似,都是可中断可恢复的协程,它们之间最大的不同是,goroutine 可能在多核上发生并行执行,单但 coroutine 始终是顺序执行。也基于此,我们应该清楚coroutine适用于IO密集程序中,而goroutine在 IO密集和CPU密集中都有很好的表现。不过话说回来,go就一定比python快么,假如在完全IO并发密集的程序中,python的表现反而更好,因为单线程内的协程切换效率更高。

从运行机制上来说,coroutine 的运行机制属于协作式任务处理, 程序需要主动交出控制权,宿主才能获得控制权并将控制权交给其他 coroutine。如果开发者无意间或者故意让应用程序长时间占用 CPU,操作系统也无能为力,表现出来的效果就是计算机很容易失去响应或者死机。goroutine 属于抢占式任务处理,已经和现有的多线程和多进程任务处理非常类似, 虽然无法控制自己获取高优先度支持。但如果发现一个应用程序长时间大量地占用 CPU,那么用户有权终止这个任务。

从协程:线程的对应方式来看

N:1,Python协程模式,多个协程在一个线程中切换。在IO密集时切换效率高,但没有用到多核
1:1,Java多线程模式,每个协程只在一个线程中运行,这样协程和线程没区别,虽然用了多核,但是线程切换开销大。
M:N,go模式,多个协程在多个线程上切换,既可以用到多核,又可以减少切换开销。(当都是cpu密集时,在多核上切换好,当都是io密集时,在单核上切换好)。

从协程通信和调度机制来看
image.png

查看原文

赞 7 收藏 6 评论 0

屈天航 发布了文章 · 2020-11-18

Golang协程并发的流水线模型

背景

最近由于性能问题,后端服务一直在做python到golang的迁移和重构。go语言精简优雅,既有编译型语言的严谨和高性能,又有解释型语言的开发效率,出色的并发性能也是go区别于其他语言的一大特色。go的并发编程代码虽然简单,但重在其并发模型和流程的设计。这里就总结下golang协程并发常用的流水线模型。

简单的流水线思维

流水线模式并不是什么新奇的概念,但是它能极大地提高生产效率。比如实际生活中的汽车生产流水线,流水线上的每一个流程负责不同的工作,比如第一个流程是拼装车身,第二个流程是安装发动机,第三个流程是装轮胎...,这些步骤我们可以类比成go并发流程中的协程,每一个协程就是一个任务。流水线上面传递的车身、发动机、轮胎,这些我们可以类比成协程间需要传递的数据,而在这些流程(协程)间传递这些配件(数据),自然就要通过传送带(channel)。在流水线上,我们装四个轮胎肯定不是一个一个来装的,肯定是有四个机械臂同时来装。因此装轮胎这个步骤我们有4个协程在并发工作来提高效率。这么一来,流水线模型的基本要素就构成了。
Golang的并发模型灵感其实都来自我们生活,对程序而言,高的生产效率就是高的性能。在Golang中,流水线由多个流程节点组成,流程之间通过channel连接,每个流程节点可以由多个同时运行的goroutine组成。
image.png

如何构造流水线

有了流水线模式的思维,接下来就是如何构造流水线了。简单来说,其实就是通过channel将任务流程连接起来,两个相邻的流程互为生产者和消费者,通过channel进行通信。耗时的流程可以将任务分散到多个协程来执行。
我们先来看一个最简单的流水线,如下图,A是生产者流程,B是它的消费流程,同时又是C的生产者流程。A,B,C三个协程直接,通过读写channel进行通信。
image.png

那如果此时B流程可以将a channel中的任务并发执行呢,很简单,我们只需要起多个B协程就可以了。如下图。
image.png

总之,我们构造流水线并发的思路是关注数据的流动,数据流动的过程交给channel,channel两端数据处理的每个环节都交给goroutine,这个流程连起来,就构成了流水线模型。

关于channel

为什么我们可以选择channel来进行协程间的通信呢,协程之间又是怎么保持同步顺序呢,当然这都要归功于channel。channel是go提供的进程内协程间的通信方式,它是协程/线程安全的,channe的读写阻塞会导致协程的切换。
channel的操作和状态组合可以有以下几种情况:
image.png

**有1个特殊场景**:当`nil`的通道在`select`的某个`case`中时,这个case会阻塞,但不会造成死锁。

channel不仅可以保证协程安全的数据流动,还可以保证协程的同步。当有并发问题时,channel也是我们首先应该想到的数据结构。不过显而易见,当使用有缓冲区的channel时,才能达到协程并发的效果,并且生产者和消费者的协程间是相对同步的。使用无缓冲区的channel时,是没有并发效果的,协程间是绝对同步的,生产者和消费者必须同时写和读协程才能运行。
channel关注的是数据的流动,这种场景下都可以考虑使用channel。比如:消息传递、信号广播、任务分发、结果汇总、同步与异步、并发控制... 更多的不在这里赘述了,总之,Share memory by communicating, don't communicate by sharing memory.

流水线模型实例

举个简单栗子,计算80000以内的质数并输出。
这个例子如果我们采用非并发的方式,就是for循环80000,挨个判断是不是素数再输出。不过如果我们采用流水线的并发模型会更高效。

从数据流动的角度来分析,需要遍历生成1-80000的数字到一个channel中,数字判断是否为素数,输出结果到一个channel中。因此我们需要两个channel,channel的两端就设计成协程即可。
1、遍历生成原始80000个数据(生产者)
2、计算这80000个数据中的素数(生产者+消费者)
3、取结果输出(消费者)

代码如下:

package gen_channel
import "fmt"
import "time"
func generate_source(data_source_chan chan int) {
   for i := 1; i <= 80000; i++ {
      data_source_chan <- i
   }
   fmt.Println("写入协程结束")
   close(data_source_chan)
}
func generate_sushu(data_source_chan chan int, data_result_chan chan int, gen_chan chan bool) {
   for num:= range data_source_chan {
      falg := true
 for i := 2; i < num; i++ {
         if num%i == 0 {
            falg = false
 break }
      }
      if falg == true {
         data_result_chan <- num
      }
   }
   fmt.Println("该协程结束")
   gen_chan <- true
}
func workpool(data_source_chan chan int, data_result_chan chan int, gen_chan chan bool, gen_num int){
   // 开启8个协程
 for i := 0; i < gen_num; i++ {
      go generate_sushu(data_source_chan, data_result_chan, gen_chan)
   }
}
func Channel_main() {
   // 任务数据
   data_source_chan := make(chan int, 2000)
   // 结果数据
   data_result_chan := make(chan int, 2000)
   // 所有任务协程是否结束
   gen_chan := make(chan bool, 8)
   time1 := time.Now().Unix()
   go generate_source(data_source_chan)
   // 协程池,任务分发
   workpool(data_source_chan, data_result_chan, gen_chan, 8)
   // 所有协程结束后关闭结果数据channel
   go func() {
      for i := 0; i < 8; i++ {
         <-gen_chan
      }
      close(data_result_chan)
      fmt.Println("spend timeis ", time.Now().Unix()-time1)
   }()
   for date_result := range data_result_chan {
      fmt.Println(date_result)
   }
}

上面这段代码中。data_source_chandata_result_chan这两个channel分别用来放原始数据和结果数据,buffer分别为2000。

generate_source协程: 生产数据,它会把数据写入data_source_chan通道,全部写入完成后关闭通道。
generate_sushu协程: 负责计算并判断data_source_chan中的数据是否为质数,是的话就写入data_result_chan通道。
主协程for date_result := range data_result_chan: 最后负责读取data_result_chan中的结果,直到data_result_chan关闭后结束程序。

可以看到我们通过workpool方法起了8个generate_sushu协程来并发处理data_source_chan的任务。那么就有一个问题,如何知道所有数据都已处理完毕呢,等到生产者generate_source协程结束data_source_chan关闭吗? 恐怕不是,因为可能data_source_chan关闭后8个任务协程仍然在继续计算。那么只能等8个协程全部处理完毕后,才能说明所有数据已处理完,从而才能关闭data_result_chan,然后主协程读取data_result_chan结束。

因此我们这里引入了另一个channel:gen_chan,来记录计算结束的任务。每个generate_sushu协程处理完,就写入一个记录到channel中。因此我们有一个匿名协程,当可以从gen_chan中取8个结果出来的话,就说明所有协程已计算完成,那么可以关上阻塞程序的最后阀门data_result_chan

当然这种设计方式并不唯一,我们也可以不用统一的data_result_chan来接收结果,而是每个协程分配一个channel来存放结果,最后再merge到一起。

可能大家觉得这种方式很复杂,确实比较高效但写起来并不友好,那有没有更友好的方式呢?

sync包

在处理并发任务时我们首先想到的应该是channel,但有时候channel不是万能或者最方便的,所以go也为我们提供了sync包。

sync包提供了各种异步及锁类型及其内置方法。用起来也很方便,比如Mutex就是给协程加锁,某个时段内不能有多个协程访问同一段代码。WaitGroup就是等待一些工作完成后,再进行下一步工作。Once可以用来确保协程中某个函数只执行1次...当我们面对一个并发问题的时候,应该去分析采用哪种协程同步方式,是channel还是Mutex呢。这需要看我们关注的是数据的流动还是数据的安全性。篇幅原因这里不再展开讲了。

  1. Mutex:互斥锁
  2. RWMutex:读写锁
  3. WaitGroup:等待组
  4. Once:单次执行
  5. Cond:信号量
  6. Pool:临时对象池
  7. Map:自带锁的map

我们接着上面质数的问题,使用sync中的WaitGroup,会让我们的代码更加友好,因为我们不需要引入一个channel来记录是否4个车轮都换完了,让WaitGroup来做就好了。

 package gen_channel
import (
   "fmt"
 "time")
import "sync"
func generate_source3(data_source_chan chan int) {
   for i := 1; i <= 80000; i++ {
      data_source_chan <- i
   }
   fmt.Println("写入协程结束")
   close(data_source_chan)
}
func generate_sushu3(data_source_chan, data_result_chan chan int, wg *sync.WaitGroup) {
   defer wg.Done()
   for num := range data_source_chan {
      falg := true
 for i := 2; i < num; i++ {
         if num%i == 0 {
            falg = false
 break }
      }
      if falg == true {
         data_result_chan <- num
      }
   }
   fmt.Println("该协程结束")
}
func workpool3(data_source_chan chan int, data_result_chan chan int, wg *sync.WaitGroup, gen_num int) {
   // 开启8个协程
 for i := 0; i < gen_num; i++ {
      wg.Add(1)
      go generate_sushu3(data_source_chan, data_result_chan, wg)
   }
}
func Channel_main3() {
   data_source_chan := make(chan int, 500)
   data_result_chan := make(chan int, 2000)
   time1 := time.Now().Unix()
   var wg sync.WaitGroup
 go generate_source3(data_source_chan)
   // 开启8个协程
 for i := 0; i < 8; i++ {
      wg.Add(1)
      go generate_sushu3(data_source_chan, data_result_chan, &wg)
   }
   wg.Wait()
   close(data_result_chan)
   fmt.Println("spend timeis ", time.Now().Unix()-time1)
   for date_result := range data_result_chan {
      fmt.Println(date_result)
   }
}

总结

流水线模式的设计要关注数据的流动,然后在数据流动的路径中将数据放到channel中,将channel的两端设计成协程。
并发设计中channel和sync可以从开发效率和性能的角度自由组合,channel不一定是最优解
写入channel的协程来控制该协程的关闭,消费者协程不关闭读协程,防止报错。养成在协程入口限制channel读写类型的习惯。

以上是我们在go并发的流水线模型中的一些总结。可以看出go的协程并发更考验我们的设计能力,因为协程间的同步和数据传递都交给了开发者来设计。同时也留给我们一些引申思考,协程在IO密集和CPU密集的情况下是否都能大幅提高性能呢?是否和channel的缓冲区或者并发设计有关呢?协程异常该怎么处理呢?go的协程和python的协程又有什么区别呢?...我们后面慢慢探讨~

查看原文

赞 18 收藏 13 评论 0

屈天航 关注了用户 · 2020-11-11

大彬 @lessisbetter

公众号:Go语言充电站
二维码:https://segmentfault.com/img/...

关注 857

屈天航 发布了文章 · 2020-11-06

阿里云对象存储OSS的python SDK示例

背景

最近公司项目需要使用阿里云的oss存储来线上实时存储图片文件。因此调研开发了python版本的阿里云oss SDK。这里我们介绍一下oss以及python oss的常用方法。

oss介绍

阿里云对象存储OSS(Object Storage Service)是阿里云提供的海量、安全、低成本、高持久的云存储服务。其数据设计持久性不低于99.9999999999%(12个9),服务可用性(或业务连续性)不低于99.995%。
OSS具有与平台无关的RESTful API接口,您可以在任何应用、任何时间、任何地点存储和访问任意类型的数据。
您可以使用阿里云提供的API、SDK接口或者OSS迁移工具轻松地将海量数据移入或移出阿里云OSS。数据存储到阿里云OSS以后,您可以选择标准存储(Standard)作为移动应用、大型网站、图片分享或热点音视频的主要存储方式,也可以选择成本更低、存储期限更长的低频访问存储(Infrequent Access)、归档存储(Archive)、冷归档存储(Cold Archive)作为不经常访问数据的存储方式。
以上是阿里云官方对于oss的介绍,总结来看,就是其可以提供高可用,高持久、低成本、支持多种文件及数据格式、支持RESTful API接口的云存储服务。就我们实际使用来说确实还不错,成本的话我们申请的海外节点,30多T数据每月差不多5000,也算比较低了。

在代码示例前,我们先认识一些概念:

存储类型(Storage Class)

OSS提供标准、低频访问、归档、冷归档四种存储类型,全面覆盖从热到冷的各种数据存储场景。其中标准存储类型提供高持久、高可用、高性能的对象存储服务,能够支持频繁的数据访问;低频访问存储类型适合长期保存不经常访问的数据(平均每月访问频率1到2次),存储单价低于标准类型;归档存储类型适合需要长期保存(建议半年以上)的归档数据;冷归档存储适合需要超长时间存放的极冷数据。

存储空间(Bucket)

存储空间是您用于存储对象(Object)的容器,所有的对象都必须隶属于某个存储空间。存储空间具有各种配置属性,包括地域、访问权限、存储类型等。您可以根据实际需求,创建不同类型的存储空间来存储不同的数据。

对象(Object)

对象是OSS存储数据的基本单元,也被称为OSS的文件。对象由元信息(Object Meta)、用户数据(Data)和文件名(Key)组成。对象由存储空间内部唯一的Key来标识。对象元信息是一组键值对,表示了对象的一些属性,例如最后修改时间、大小等信息,同时您也可以在元信息中存储一些自定义的信息。

地域(Region)

地域表示OSS的数据中心所在物理位置。您可以根据费用、请求来源等选择合适的地域创建Bucket。

访问域名(Endpoint)

Endpoint表示OSS对外服务的访问域名。OSS以HTTP RESTful API的形式对外提供服务,当访问不同地域的时候,需要不同的域名。通过内网和外网访问同一个地域所需要的域名也是不同的。

访问密钥(AccessKey)

AccessKey简称AK,指的是访问身份验证中用到的AccessKey Id和AccessKey Secret。OSS通过使用AccessKey Id和AccessKey Secret对称加密的方法来验证某个请求的发送者身份。AccessKey Id用于标识用户;AccessKey Secret是用户用于加密签名字符串和OSS用来验证签名字符串的密钥,必须保密。

oss python SDK

OssClient支持的方法:
1、上传文件对象到oss存储空间
2、上传本地指定路径文件到oss存储空间
3、下载文件到文件流对象
4、下载文件到本地指定路径
5、生成加签的临时URL以供授信用户下载

以上功能基本可以满足日常业务需求,当然oss也提供了很多个性化的操作,比如断点下载、范围下载、断点上传、追加上传、上传回调、图片调整等等,这里不再赘述,都有专门的接口方法。
以下列举了接口实现方法,并没有在其中加入各种判断逻辑,比如文件是否存在、权限控制、是否覆盖、是否加密等等。有需要的可以自己添加逻辑

import oss2


AccessKeyId = 'LTA************hpmoN9'
AccessKeySecret = '0ise*************bkIyF'
BucketName = '*******'
Endpoint = 'http://oss-ap-south-1.aliyuncs.com'


class OssClient(object):
    __instance = None
    __first_init = False

    # 单例模式
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super().__new__(cls)
        return cls.__instance

    def __init__(self):
        cls = self.__class__
        if not cls.__first_init:
            self.auth = oss2.Auth(AccessKeyId, AccessKeySecret)
            self.bucket = oss2.Bucket(self.auth, Endpoint, BucketName)
            cls.__first_init = True


    def upload_file_from_fileobj(self):
        """
            upload_file_from_fileobj方法:上传文件对象到oss存储空间, 该方法可用于我们从上游服务接收了图片参数,然后以二进制形式读文件,上传到oss存储空间指定位置(abc/efg/00),
        当然也可以将本地文件上传到oss我们的bucket. 其中fileobj不止可以是文件对象,也可以是本地文件路径。 put_object方法底层仍是RESTful API的调用,可以指定headers,规定Content-Type等内容
        """
        # 判断bucket中文件是否存在,也可以不判断,会上传更新
        exist = self.bucket.object_exists('abc/efg/00') #<yourObjectName>
        if exist:
            return True
        with open('/home/rong/www/0', 'rb') as fileobj:
            result = self.bucket.put_object('abc/efg/00', fileobj, headers=None) #<yourObjectName>
        if result.status == 200:
            return True
        else:
            return False


    def upload_file_from_loaclfilepath(self):
        """
            upload_file_from_loaclfilepath:上传本地指定路径文件(/home/rong/www/0)到oss存储空间指定位置(abc/efg/0)。与put_object方法不同,put_object_from_file的第二个参数只能是本地文件路径
        """
        # 判断bucket中文件是否存在,也可以不判断,会上传更新
        exist = self.bucket.object_exists('abc/efg/0') #<yourObjectName>
        if exist:
            return True
        result = self.bucket.put_object_from_file('abc/efg/0', '/home/rong/www/0', headers=None) #(<yourObjectName>, <yourLocalFile>)
        if result.status == 200:
            return True
        else:
            return False


    def download_file_to_fileobj(self):
        """
            download_file_to_fileobj:下载文件到文件流对象。由于get_object接口返回的是一个stream流,需要执行read()后才能计算出返回Object数据的CRC checksum,因此需要在调用该接口后做CRC校验。
        """
        object_stream = self.bucket.get_object('abc/efg/0') #<yourObjectName>
        result = object_stream.read()
        if object_stream.client_crc != object_stream.server_crc:
            print("The CRC checksum between client and server is inconsistent!")
            result = None
        return result


    def download_file_to_loaclfilepath(self):
        """
            download_file_to_loaclfilepath:下载文件到本地路径。get_object和get_object_to_file的区别是前者是获取文件流实例,可用于代码处理和远程调用参赛。后者是存储到本地路径,返回的是一个http状态的json结果
        """
        result = self.bucket.get_object_to_file('abc/efg/0', '/home/rong/www/download/0') # ('<yourObjectName>', '<yourLocalFile>')
        if result.status == 200:
            return True
        else:
            return False

    def generate_temporary_download_url(self):
        """
            generate_temporary_download_url: 生成加签的临时URL以供授信用户下载。一般在实际业务中,我们是提供给调用方一个临时下载链接,来让其获取文件数据,而不是直接使用以上暴露AccessKeyId和AccessKeySecret的方法。
            因此一般我们会存储某条数据oss的路径(<yourObjectName>)与调用方某个唯一标识的对应关系(如手机号身份证号),在调用方请求时,通过该标识获取其数据的oss文件路径(<yourObjectName>),
            然后制定过期时间,为其生成临时下载链接
            http://bucketname.oss-ap-south-1.aliyuncs.com/abc/efg/0?OSSAccessKeyId=LTA************oN9&Expires=1604638842&Signature=tPgvWz*************Uk%3D
        """
        res_temporary_url = self.bucket.sign_url('GET', 'abc/efg/0', 60, slash_safe=True)
        return res_temporary_url


if __name__ == '__main__':
    oss_client = OssClient()
    print(oss_client.bucket.bucket_name)
    print(oss_client.bucket.ACL)
    from itertools import islice
    for b in islice(oss2.ObjectIterator(oss_client.bucket), 10):
        print(b.key)
    ...
查看原文

赞 0 收藏 0 评论 0

屈天航 发布了文章 · 2020-09-22

机器学习模型python在线服务部署的两种实例

背景

众所周知python在机器学习实践中的应用广泛深入,而在我们业务中的应用集中在提供线上实时风控输出服务,比如国内业务的模型在线服务架构和海外业务的后台决策引擎架构。这两种应用的结合就要求我们考虑如何高效安全便捷地来实现模型的在线部署,为上游提供服务。
   在我们的考虑中,无论是代码复杂程度和业务场景,还是语言本身的特点,模型部署都有趋于向微服务架构转型的趋势和需要。一方面,需要进行代码分离来明确责任分工提高开发效率和容错性。另外一个方面,python在CPU密集型的应用中表现是无法令人满意的。为了使用协程来提高异步性从而处理更多的并发请求,最直接地就是将CPU密集转化为IO密集,因为Python天生就适合IO密集型的网络应用。
   因此,我们生产中将模型计算抽取为model_lib代码库,并且通过微服务online_model进行交互。这里我们调研过两种模型部署的方式,最终选择了第一种。

一、基于flask框架进行模型部署

Flask是一个轻量级的可定制框架,具有灵活、轻便且高效的特点,并且是标准的wsgi接口框架,易于扩展和维护。

1. 为什么选用nginx+uwsgi+flask这种技术架构

1)    Uwsgi搭配nginx性能快,内存占用低,高度可定制,自带详尽日志功能,支持平滑重启。

2)    Flask完全兼容了wsgi标准; 微框架,扩展性强; 完全基于unicode,不需处理编码问题;自带服务可独立做单元测试及开发。
image.png

3)    我们客户端采用了tornado协程,已经实现了将cpu计算转为io操作,服务端完全是CPU密集的模型计算,不会释放进程,异步框架保持大量文件描述符状态耗费内存,因此不适用异步IO框架。

2. 业务流程框架

image.png

3. 部署方式:

部署方式采用nginx+uwsgi+flask的方式,uwsgi可直接接受socket而不是http请求提高性能,再将服务转发给flask框架,这里注意flask此类wsgi标准接口的服务框架比如djangoweb.py在生产中一般不使用自带服务,而是在上层部署uwsgi或者gunicorn作为服务器来进行服务转发,上层再用nginx来做负载均衡,这样可以提高服务稳定性和性能。
(这里的inrouter层是我们自己封装的路由层,用来做集群中的路由转发,nginx只在本地做了端口转发。)
image.png

4. 代码示例:

uwsgi服务配置:

[uwsgi]
# 监听端口
socket=127.0.0.1:8200
# 进程数
processes=20

;async=4
;threads=2
;enable-threads = true
# 运行的目录
chdir = /home/rong/www/online_model
# wsgi文件
wsgi-file = model_main.py
callable=app
# 是否要有主进程
master = true
# 后台运行及其打印的日志
daemonize = /home/rong/www/log/uwsgi.log
# 主进程pid文件
pidfile = /home/rong/www/log/online_model/pid/uwsgi.pid
# 日志切割大小
log-maxsize = 5000000
# 不记录请求信息的日志。只记录错误以及uWSGI内部消息到日志中。
disable-logging = false
# 超时时间
http-timeout= 3
# 传输数据大小限制
buffer-size  = 1048576
# 每个进程单独加载
lazy-apps = true

flask服务关键代码:

import importlib
import json
import cProfile
import pstats
import StringIO
import time
import traceback
from flask import Flask, request
from common import rong_logger
from common.global_variable import StaticCacheClass
import autopath  # 不能去掉

app = Flask(__name__)
# 这里是模型代码库的统一入口,模型代码库中是通过抽象类实现的规范化的模型代码实例,通过此服务提供调用,也通过离线调度进行跑批任务。保证线上线下模型调用一致性
online_model_main = importlib.import_module('online_model_main')

MUST_PARAMS = ['resource_id', 'feature_dict']
SUCCESS_STATUS = 0
ERROR_STATUS = 1


# 路由函数只允许post请求
@app.route("/", methods=['POST'])
def model_main():
    uniq_id = '[%s][%s]' % (request.form.get('resource_id', ''), request.url)
    try:
        status, msg, params = _check_params(request)
        if status != SUCCESS_STATUS:
            rong_logger.error(uniq_id + 'params error, detail: %s' % msg)
            status, msg, result = status, msg, None
        else:
            resource_id, feature_dict = params['resource_id'], json.loads(params['feature_dict'])
            status, msg, result = online_model_main.main(resource_id, feature_dict, request, rong_logger)
            rong_logger.info(uniq_id + '[%s][%s][%s]' % (status, msg, result))

    except Exception as e:
        rong_logger.error(uniq_id + 'error: %s, detail: %s' % (str(e), traceback.format_exc()))
        status, msg, result = 5, 'online_model_error:' + str(e), None
    return _get_response(status, msg, result)

模型代码库模型实例:

其中 XgboostExecutor类是基于xgb模型抽象类实现的类,通过它来实例化一个模型对象,给flask应用提供调用。具体我们不再深究,有兴趣可以再写一个专题来介绍模型代码库的部署。

# -*- coding:utf-8 -*-
# !/usr/bin/python

import logging
import os

from executor.src.load_helper import read_cur_path
from executor.xgboost_model_executor import XgboostExecutor

logging.basicConfig(level=logging.INFO, format='%(asctime)s:%(message)s')




[model_path, features_path,feature_importance_path] = map(
    read_cur_path, ["xgb_model", "feature_list","feature_importance"]
)
model = XgboostExecutor(model_path, features_path,
                        feature_check_white_list=["n21_score"],
                        white_or_weight=False,
                        feature_check_weight_limit= 1,
                        feature_importance_path=feature_importance_path,
                        manager="qutianhang@xx.com",
                        developer="qutianhang@xx.com",
                        correlation="negative")

5. 性能比对

微服务改造后20并发请求模型:
image.png
微服务改造前20并发请求模型:
image.png

本机测试并发性能就提高了20%,但注意这是在高并发的情况下,就单条请求来看,微服务并不能显著提高性能。

二、 基于grpc进行在线模型部署

在 gRPC 里客户端应用可以像调用本地对象一样直接调用另一台不同的机器上服务端应用的方法,能够更容易地创建分布式应用和微服务。与许多 RPC 系统类似,gRPC 也是基于以下理念:定义一个服务,指定其能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个存根能够执行在服务端实现的一样的方法(这个方法就类似于接口)
image.png

1. 为什么选用grpc进行模型部署

  • 1)grpc使用ProtoBuf来定义服务、请求返回的数据格式,压缩和传输效率高,语法简单,表达力强。(如下为ProtoBuf的序列化和反序列话性能表现

image.pngimage.png

  • 2)grpc可支持tensorflow serving的接口调用,tensorflow完成模型训练和部署后,可提供服务接口,给grpc进行调用。实现方便,高效,自带版本管理、模型热更新等,很适合大规模线上业务,是我们下一步模型部署的技术方向。
  • 3)gRPC支持多种语言,并能够基于语言自动生成客户端和服务端功能库。

2. 部署方式(业务流程与之前相同)

部署方式采用nginx+grpc,要求nginx支持http2.0。在客户端将json特征字典转为protobuf。(https://github.com/NextTuesday/py-pb-converters/blob/master/pbjson.py 这里附上json和protobuf互相转化的脚本。)

image.png

3. 服务发现与负载均衡

image.png

4. 开发流程

image.png

客户端:
image.png

服务端:
image.png

三、 两种方式线上模型部署对比

1)    grpc使用protbuf更加复杂,需要在客户端服务端均保留protbuf文件并做校验,而flask只需要做好统一的接口标准规范即可。

2)    grpc使用http2.0更适用移动端的在线模型预测,或者基于tensorflowd的大规模线上模型部署和预测,flask更适用后端面向服务的手动模型部署和预测。

3) grpc节省数据空间,但与python交互需要做json和protbuf数据转换,flask兼容wsgi标准,适用于RESTful类服务,但数据传输占用空间较大。

查看原文

赞 4 收藏 1 评论 0

屈天航 关注了用户 · 2020-09-08

陈星星 @chen_xingxing

谜一样的烟火🎆

关注 32

屈天航 关注了用户 · 2020-09-08

ronniesong @sxssxs

Seek the truth!

关注 136