手记

Rails多线程和DB connection pool

在rails的多线程编程中,connection pool(连接池)是一种常见解决方案,在一些IO操作中,connection pool能够降低多线程操作的并发成本,提高并发能力。常见的有开源项目:https://github.com/mperham/connection_pool,比如在rails项目中使用redis就可以设置连接池:

$redis = ConnectionPool.new(size: 5, timeout: 5) { Redis.connect }
$redis.with do |conn|
  conn.sadd('foo', 1)
  conn.smembers('foo')end

还有一个比较常见的连接池使用是在rails项目的database配置中,config/database.yml:

production:
  url:  <%= ENV["DATABASE_URL"] %>  pool: <%= ENV["DB_POOL"] || ENV['MAX_THREADS'] || 5 %>

刚好我们的一个项目在这个Database的connection pool上,碰到了一些问题,这里稍微做一些探讨。

问题

我们的项目中有一个并行的需求,因为要对某一时间段数据做大量的查询和聚合运算,查询次数会比较大,对数据库单个SQL做优化后查询时间还是比较长,鉴于是DB IO阻塞操作,所以这里使用了多线程并行查询,优化整体请求时间,我们用了多线程之后,请求处理时间得到优化,但是并发能力反倒下降,常常遇到报错:

ActiveRecord::ConnectionTimeoutError - could not obtain a database connection within 5 seconds. The max pool size is currently 5; consider increasing it

技术栈

这里先对项目做个大致的介绍,该项目使用的框架是Ruby on Rails,数据库是postgreSQL,model层则使用的原生的ActiveRecord,多线程并行部分是在一个model中使用,业务是对一些日期中每天的每个设备的不同指标的读数做一些运算放到数组中,并行则是在这个库:https://github.com/grosser/parallel 的帮助下实现:

Parallel.each(dates, in_threads: 10) do |day| # 第一层循环:对dates中的每天循环执行,并且调用Parallel开启10线程
  date = Date.parse day
  time_s, time_e = generate_time(day, from, to)
  averages = devices.map do |device_id| #第二层循环:对devices循环
     average_by_time_range(device_id, time_s, time_e) #调用查询方法
  end
  daily_average_results[day] = {    date: day,    data: averages
  }enddef average_by_time_range(device_id, time_s, time_e)
  average = {device_id: device_id, four_hours_lost: false, data_complete: data_complete}
  average_sql = "SELECT AVG(pm2p5) \"pm2p5\", AVG(co2) \"co2\", AVG(tvoc) \"tvoc\"  FROM monitor_hourly_readings WHERE device_id = #{device_id}  AND reading_time BETWEEN '#{time_s}' AND '#{time_e}' "
  results = ActiveRecord::Base.connection.execute(average_sql)[0] #调用sql
  [:pm2p5, :co2, :tvoc].each do |indicator_name|
    average_reading = results[indicator_name.to_s]    if !average_reading
      average[indicator_name] = nil
    elsif check_four_hours_lost?(device_id, indicator_name ,time_s, time_e) #调用判断方法
      average[indicator_name] = nil
      average[:four_hours_lost] = true
    else
      average[indicator_name] = average_reading.to_f    end

  end
  averageenddef check_four_hours_lost?(device_id, indicator_name ,time_s, time_e)
  four_hours_failed_sql = ["SELECT COUNT(*) FROM ( SELECT *, reading_time - LAG(reading_time)
    OVER (order by reading_time) AS change FROM monitor_hourly_readings WHERE device_id = ?
    AND  reading_time >= ? AND reading_time <= ? AND #{indicator_name} IS NOT NULL) AS inner_table WHERE change >= interval '4 hours'",
    device_id, time_s, time_e]
  count_by_sql(four_hours_failed_sql) > 0#调用sql查询end

这一段代码开始用parallel实现10个thread来对一段时间中每天的结果做查询和计算,具体每天的查询则是在方法 average_by_time_range 中实现,即ActiveRecord::Base.connection.execute() 执行了一段SQL。同时, average_by_time_range 中调用了 check_four_hours_lost? 方法完成一段SQL查询的判断, check_four_hours_lost? 中使用了ActiveRecord内建的 count_by_sql 方法。所以这里在两个方法中分别调用了数据库,并且嵌在了dates和devices两层循环之中,所以查询次数相对较大。

解决

面对如上问题做了一些简单的尝试去修改代码和测试,按照思路依次做了如下修改:

  1. 修改config
    在发现ActiveRecord::ConnectionTimeoutError报错常出现之后,最暴力的解决方案就是加大了config/database.yml中的pool数字,因为是内部工具项目,对并发的要求不是很高,一开始database.yml中的pool数字是10,这里修改到常见的25,测试之下并发能力有提升但是有限。如此小的并发下继续会报错,所以觉得最大的问题还是在并行使用中的错误用法了。

  2. 降低pool

所以在加大pool数字效果不明显的情况下考虑是否应该降低并行线程的数量,顺便也做了一些简单测试,发现在服务器上10线程相对4线程数据库IO效率提高有限,这里就将thread降低到了4:

Parallel.each(dates, in_threads: 4) do |day| 
  ...end

做相应测试,确实并发能力提高不少,但是依旧不能满意,在使用时候还是会发生ActiveRecord::ConnectionTimeoutError

  1. 使用with_conneciton
    查看log,看问题发生的位置,是在手动调用sql的那句上:

 results = ActiveRecord::Base.connection.execute(average_sql)[0] #调用sql

感觉在对parallel的使用上还是存在问题,或者 ActiveRecord::Base.connection的使用会有特殊要求,回到parallel的文档上,查看有没有在parallel中使用ActiveRecord的具体说明,果然,还是要认真看文档的, parallel中给出了使用如何搭配使用ActiveRecord的sample:

# reproducibly fixes things (spec/cases/map_with_ar.rb)Parallel.each(User.all, in_processes: 8) do |user|
  user.update_attribute(:some_attribute, some_value)endUser.connection.reconnect!# maybe helps: explicitly use connection poolParallel.each(User.all, in_threads: 8) do |user|
  ActiveRecord::Base.connection_pool.with_connection do
    user.update_attribute(:some_attribute, some_value)  endend# maybe helps: reconnect once inside every forkParallel.each(User.all, in_processes: 8) do |user|
  @reconnected ||= User.connection.reconnect! || true
  user.update_attribute(:some_attribute, some_value)end

这段sample里可以看到,parallel使用时候还是要对ActiveRecord的connection做特殊处理的,比如 reconnect! 方法或者放在 with_connection 方法。因此考虑原因,是否因为在使用ActiveRecord::Base.connection的时候得手动关闭connection,查看ActiveRecord的connection_pool实现的源码。
ActiveRecord没有使用之前提到的开源的connection pool,而是自己实现了一套connection pool,class的层级结构大致如下:

  • class ConnectionPool

    • class Queue

    • class Reaper

这里的Queue 主要功能是用来存放链接池中创建的connections

def initialize(lock = Monitor.new)
  @lock = lock
  @cond = @lock.new_cond
  @num_waiting = 0
  @queue = []end

可以看到,Queue使用Monitor作为线程锁来保证多线程的thread safe,使用数组来存放connections,以数组的push和slice方法实现先进先出的存储堆结构,在此之上实现了add, clear, remove等功能方法,其中值得注意的是实现了基于timeout的轮询机制:

def poll(timeout = nil)
  synchronize do
    if timeout
      no_wait_poll || wait_poll(timeout)    else
      no_wait_poll    end
  endend# Waits on the queue up to +timeout+ seconds, then removes and# returns the head of the queue.def wait_poll(timeout)
  @num_waiting += 1

  t0 = Time.now
  elapsed = 0
  loop do
    @cond.wait(timeout - elapsed)    return remove if any?

    elapsed = Time.now - t0
    if elapsed >= timeout
      msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' %
        [timeout, elapsed]
      raise ConnectionTimeoutError, msg    end
  endensure
  @num_waiting -= 1end

然后Reaper这个类的作用就比较简单了,会在每个connection_pool实例化的时候另起一个线程,用来定时清除未被正常释放的connection,整个实现才一二十行:

  # Every +frequency+ seconds, the reaper will call +reap+ on +pool+.
  # A reaper instantiated with a nil frequency will never reap the
  # connection pool.
  #
  # Configure the frequency by setting "reaping_frequency" in your
  # database yaml file.
  class Reaper
    attr_reader :pool, :frequency

    def initialize(pool, frequency)
      @pool      = pool
      @frequency = frequency    end

    def run
      return unless frequency
      Thread.new(frequency, pool) { |t, p|
        while true
          sleep t
          p.reap        end
      }    end
  end

这里比较关键的是 frequency 变量,会在connection_pool实例化的时候传入,默认数值是5 seconds,所以对于没有正确释放的connection会在5秒后自动回收。

ConnectionPool类相对比较复杂,里面值得注意的几个方法

def connection
  # this is correctly done double-checked locking
  # (ThreadSafe::Cache's lookups have volatile semantics)
  @reserved_connections[current_connection_id] || synchronize do
    @reserved_connections[current_connection_id] ||= checkout  endenddef with_connection
  connection_id = current_connection_id
  fresh_connection = true unless active_connection?  yield connectionensure
  release_connection(connection_id) if fresh_connectionenddef checkout
  synchronize do
    conn = acquire_connection
    conn.lease
    checkout_and_verify(conn)  endenddef checkin(conn)
  synchronize do
    conn.run_callbacks :checkin do
      conn.expire    end

    release conn

    @available.add conn  endenddef acquire_connection
  if conn = @available.poll
    conn  elsif @connections.size < @size
    checkout_new_connection  else
    @available.poll(@checkout_timeout)  endenddef current_connection_id #:nodoc:
  Base.connection_id ||= Thread.current.object_idend

从pool里获取可用connection的三个公开方法: connection, with_connection, checkout,而checkin 则是回收该线程的connection,每个connection是用所属线程的object_id来标记的,见current_connection_id方法。
这几个获取connection方法有细微区别,connection方法是获取当前线程的connection,没有则通过checkout创建,with_connection则是获取或者创建新后讲connection传入内block执行,并且用ensure语句保证在执行完成之后release这个connectioncheckout 方法则是通过调用私有方法:acquire_connection 获取新连接。
所以我们回到之前我们自己的业务代码,就发现我们使用的results = ActiveRecord::Base.connection.execute(average_sql)[0]是不会主动释放connection的,这样需要等到Reaper在五秒之后自动释放回收,所以对于多线程情况,我们应该将代码放在with_connection方法的block中,来保证 connection 及时的release,增加并行能力。所以我们将:

results = ActiveRecord::Base.connection.execute(average_sql)[0]

修改为

ActiveRecord::Base.with_connection do |conn|
  results = conn.execute(average_sql)[0]end
  1. count_by_sql方法
    通过上述修改,再次测试,发现在同样并行压力下依然会报错,但是报错位置从results = ActiveRecord::Base.connection.execute(average_sql)[0] 跳到 count_by_sql(four_hours_failed_sql) > 0 这个位置,这就奇怪了,本来觉得是因为手动调用ActiveRecord::Base.connection.execute()链接没有释放的原因,修改后怎么并发没有提升呢,就翻看了ActiveRecord的源码:

# ==== Parameters## * +sql+ - An SQL statement which should return a count query from the database, see the example below.##   Product.count_by_sql "SELECT COUNT(*) FROM sales s, customers c WHERE s.customer_id = c.id"def count_by_sql(sql)
  sql = sanitize_conditions(sql)
  connection.select_value(sql, "#{name} Count").to_iend

原来这里使用的也是connection方法,所以如上条,我们得把这里的查询也放进with_connection的block里面,所以最终代码是:

def average_by_time_range(device_id, time_s, time_e)
  average = {device_id: device_id, four_hours_lost: false, data_complete: data_complete}
  average_sql = "SELECT AVG(pm2p5) \"pm2p5\", AVG(co2) \"co2\", AVG(tvoc) \"tvoc\"  FROM monitor_hourly_readings WHERE device_id = #{device_id}  AND reading_time BETWEEN '#{time_s}' AND '#{time_e}' "
  ActiveRecord::Base.with_connection.do |conn|
    results = conn.execute(average_sql)[0] #调用sql
    [:pm2p5, :co2, :tvoc].each do |indicator_name|
      average_reading = results[indicator_name.to_s]      if !average_reading
        average[indicator_name] = nil
      elsif check_four_hours_lost?(device_id, indicator_name ,time_s, time_e) #调用判断方法
        average[indicator_name] = nil
        average[:four_hours_lost] = true
      else
        average[indicator_name] = average_reading.to_f      end

    end
  end
  averageend

经测试,确实有效提高了并发能力。

探讨

以上的测试因为没有完整定量测试报告,所以不给出具体数据了,然后这里想为什么ActiveRecord里面的find_by_sql, count_by_sql等默认使用connection方法而不是with_connection呢? 想一想可能也是为了优化操作效率,因为ActiveRecord的并行场景不常规,然后如果使用with_connection则反倒会增加每次的调用成本。然后另外一个问题是config/databse.yml中pool size该怎么设置呢,按照什么原则?比如server使用的puma,则可能对应于设置的thread数量,同时也得考虑sidekiq的concurrency数量。但是仅仅这样,为什么不尽量设置大一点的数字呢?可能是内存等资源的压力之间的平衡问题了,相必对于这个问题得再做深入的探讨和一些测试。



作者:charleschu
链接:https://www.jianshu.com/p/b8c54291e5e2

0人推荐
随时随地看视频
慕课网APP