CloudFoundry之DEA源码分析
DEA启动分析

ruby的代码结构一般都是一样的: bin目录是执行文件
conf是配置文件
lib则是ruby source
spec放置一些测试的文件
Gemfile+Gemfile.lock为bundler服务,让dea可以找到正确的gem依赖
Rakefile是用来做测试的 先看下dea的目录结构,bin 放的是脚本,等同shell的作用,脚本引用的是lib/dea.rb,而lib/dea.rb接着引用/lib/dea/agent.rb来完成请求响应,lib负责了所有的逻辑业务,可以说lib是工程的核心, config放配置文件,spec是一些test unit. 我们重点来看lib/dea.rb和agent.rb 1. lib/dea.rb IT论坛不用关注,就是接收参数,处理参数.来看最后几句: [size=0.95em]EM.epollEM.run { agent = DEA::Agent.new(config) agent.run()}
普及下EventMachine http://blog.csdn.net/resouer/article/details/7975550
EventMachine 开启epoll模式,默认为select,epoll比select高效,原因见这篇文章:http://eventmachine.rubyforge.org/docs/EPOLL.html,
CF中几乎所有的组件启动时,都用EM,所有还是有必要搞清楚.这里EM.run就是开启了这个I/O,整个启动操作都在EM中进行,为什么不用ruby的多线程?
这两句代码不用解释了,初始化Agent,然后执行run.赶紧来看run方法都做了些什么操作,由于代码比较多,我们挑重点来分析,但基本流程不会省. 1. 准备文件目录,以及处理参数 [size=0.95em] # Make sure all the correct directories exist. begin FileUtils.mkdir_p(@droplet_dir) FileUtils.mkdir_p(@staged_dir) FileUtils.mkdir_p(@apps_dir) FileUtils.mkdir_p(@db_dir) if @secure # Allow traversal by secure users FileUtils.chmod(0711, @apps_dir) FileUtils.chmod(0711, @droplet_dir) end rescue => e @logger.fatal("Can't create support directories: #{e}") exit 1 end
2. 循环地开启file_viewer服务,web服务器用thin,这里用到了EM.next_tick,指的是不用马上执行,EM自已调度,空闲的时候执行.因为这一步不太重要,让主进程让给更重要的事情吧. [size=0.95em] EM.next_tick do unless start_file_viewer # Periodically try to start the file viewer in case of port contention @filer_start_timer = EM.add_periodic_timer(1) do if start_file_viewer EM.cancel_timer(@filer_start_timer) @filer_start_timer = nil end end end end
3. EM定义了两个错误处理回调方法,暂时触发不了 [size=0.95em]#James 定义NATS的回调行为,这里是在NATS发生错误时,将droplet ID列表保存在db/application.json中,恢复时用. #默认recovered_droplets=false NATS.on_error do |e| @logger.error("EXITING! NATS error: #{e}") @logger.error(e) # Only snapshot app state if we had a chance to recover saved state. This prevents a connect error # that occurs before we can recover state from blowing existing data away. snapshot_app_state if @recovered_droplets exit! end EM.error_handler do |e| @logger.error "Eventmachine problem, #{e}" @logger.error(e) end
4.清理资源,理解为资源回收. [size=0.95em] # Calculate how much disk is available before we respond to any messages update_droplet_fs_usage(:blocking => true)
5. 接下来这个比较大头. [size=0.95em]NATS.start(:uri => @nats_uri) do # Register ourselves with the system status_config = @config['status'] || {} VCAP::Component.register(:type => 'DEA', :host => @local_ip, :index => @config['index'], :config => @config, :port => status_config['port'], :user => status_config['user'], :password => status_config['password']) uuid = VCAP::Component.uuid # Setup our identity @hello_message = { :id => uuid, :ip => @local_ip, :port => @file_viewer_port, :version => VERSION }.freeze @hello_message_json = @hello_message.to_json # Setup our listeners.. NATS.subscribe('dea.status') { |msg, reply| process_dea_status(msg, reply) } NATS.subscribe('droplet.status') { |msg, reply| process_droplet_status(msg, reply) } NATS.subscribe('dea.discover') { |msg, reply| process_dea_discover(msg, reply) } NATS.subscribe('dea.find.droplet') { |msg, reply| process_dea_find_droplet(msg, reply) } NATS.subscribe('dea.update') { |msg| process_dea_update(msg) } NATS.subscribe('dea.stop') { |msg| process_dea_stop(msg) } NATS.subscribe("dea.#{uuid}.start") { |msg| process_dea_start(msg) } NATS.subscribe('router.start') { |msg| process_router_start(msg) } NATS.subscribe('healthmanager.start') { |msg| process_healthmanager_start(msg) } NATS.subscribe('dea.locate') { |msg| process_dea_locate(msg) } # Recover existing application state. recover_existing_droplets delete_untracked_instance_dirs EM.add_periodic_timer(@heartbeat_interval) { send_heartbeat } EM.add_periodic_timer(@advertise_interval) { send_advertise } EM.add_timer(MONITOR_INTERVAL) { monitor_apps } EM.add_periodic_timer(CRASHES_REAPER_INTERVAL) { crashes_reaper } EM.add_periodic_timer(VARZ_UPDATE_INTERVAL) { snapshot_varz } EM.add_periodic_timer(DROPLET_FS_PERCENT_USED_UPDATE_INTERVAL) { update_droplet_fs_usage } NATS.publish('dea.start', @hello_message_json) send_advertise end
连接上NATS服务,这一步是必须而且优先的,因为CF的通信全靠它了; 然后弄个回调方法,向VCAP注册自已,每个组件启动时都要向VCAP注册自已,全局监控用.至于怎么监控.查看代码以及官方文档,说是注册后,就会给这个组件启个服务(thin server),用于CF周期性地发http请求来知晓各组件状态,http URL 分为两个,一个是/healthz,简单地返回OK不OK; /varz则返回一堆运行时参数.但从源代码上并不是这么回事.不是所有组件都启thin服务,再深入研究吧. 接下来是订阅一堆NATS message,这也是各个组件启动时要做的事情.各个sub_message都有block来处理,有些要返回值,有些不用. recover_existing_droplets,如果是重启,并开启了恢复机制,这个就是恢复之前运行正常的droplet.当dea 重启时,这些上次正常运行的droplet是如何记录下来的呢,看看第3步snapshot_app_state if @recovered_droplets, 这个是说当nat 出错了或是啥地,把droplets 快照一份保存起来.供下一次重启时读取. delete_untracked_instance_dirs,删掉那些僵死的或是无法追踪的droplet,然后发布一堆设置一堆timer,周期性地发NATS message,有heartbeat,有monitor message,反正是让IT论坛组件知道些事情.关于组件间的NATS message交互,请阅读http://apidocs.cloudfoundry.com/的NATS message,发布dea.start,是发给cc的,告诉它,已经准备好了,随时可以接收上级指示. send_advertise,就是发布dea.advertise,也是给CC的. 总结, dea 启动时,向vcap注册自已,然后订阅,发布消息. 要知道dea都有哪些功能,其它就看他的NATS message就知道了. 参考http://apidocs.cloudfoundry.com/ 的NATS message
DEA功能点分析1. 启动实例
对应的nats message是 dea.uuid.start。流程最为复杂。 (1)简单说就是先计算资源够不够用,
(2)然后添加至droplets数组中,给IT论坛组件发信息时用到,还有就是快照,dea挂掉重启时,根据快照的droplet id来恢复各droplet。
(3)根据收到的message,去download droplet。这里有cache和mount的处理判断。如果本地已经down下来,或是dea server跟cc的stager目录有mount,直接就可以搞。
(4)判断runtime是不是符合,如果配置文件里没说明到位,还有个runtime自发现的过程。
(5)droplet拉回来了,runtime也OK了,就安排启动需要的端口之类的,看看有没什么要先执行的。没有的话,就export一堆env,再执行startup 2. 停止实例
对应的nats message是 dea.stop。 回收资源,删掉droplet,从droplets数组中剔除等。 3. 启动dea
调用/lib/dea.rb===>agent.run(),见本文的第一部分详解。 4. 停止dea
调用vcap_dev stop dea ==>vcap ==>vcap_components.stop,其实就是直接kill -9,还有几个kill TEAM之类的,代码里埋了一些触发点 trap("xx"). 5. 更新droplet
对应nats message是 dea.update,看代码里只是更新uris,就是域名。 6. 查看droplet信息
对应dea.find.droplet。返回droplet的详细信息,包括droplet所处服务器的ip,port,log路径之类的。CC可以据此拼出log地址,从而读取。 7. 发送心跳
向hm汇报此dea的所有droplet情况,必须全部的droplet都要汇报。hm拿这些数据跟CCD里的数据相比较,如果有不一致的情况,说明有问题,告知cc,cc会做相应的操作。 8. 向router注册
向router汇报此dea的所有droplet情况,必须全部的droplet都要汇报,router根据此来判断droplet是否正常(一定时间内无消息返回,就视为无效,下回请求就不会路由过去了)。 9. 回应cc的discover请求
当cc在cache里找不到合适的dea启动instance,就发个discover广播,所有dea都订阅,而且设置了立马reply。各DEA视自已的资源是否满足要求,满足的话就回个话。CC用收到的第一个dea来启动实例。dea回话的时候,有个延迟策略,即计算自身的已启动的instance数量,内存使用量,延迟一定时间再回话,不当出头鸟,但最大延迟是0.25秒。 DEA资源隔离
1. 先介绍当前vcap的资源隔离方法 首先,要开启secure模式,在config/dea.yml里有设置,同时必须是以root权限启的vcap。
vcap会选择一个安全用户来启动这个droplet,但一般是没有的,vcap会自动给创建,这些用户是以“vcap_user”开头的,没有home目录,没有密码之类的。默认的user group是“vcap_group”,查看secure.create_default_group方法: [size=0.95em] def create_default_group # Add in default group cmd = "addgroup --system #{DEFAULT_SECURE_GROUP} > /dev/null 2<&1" if isLinux cmd = "dscl . -create /Groups/#{DEFAULT_SECURE_GROUP} PrimaryGroupID #{SECURE_UID_BASE}" if isMacOSX system(cmd) end
[size=0.95em] args = [ "adduser", "--firstuid %d" % (SECURE_UID_BASE + 1), "--ingroup %s" % DEFAULT_SECURE_GROUP, "--home /nonexistent", "--no-create-home", "--shell /bin/sh", "--disabled-login", "--gecos ''", "--quiet", username, ]
然后,选一个用户,记录在instance属性中,instance:user=user,把相关的目录权限赋给这个用户,并且把fetch回来的droplet所在的目录及文件权限,所属者等,都给此用户。最后,启动的时候,以此用户启动,带上一些ulimit参数来限制这个droplet使用的资源。 [size=0.95em] if @secure || @enforce_ulimit process.send_data("renice 0 $$\n") process.send_data("ulimit -m #{mem_kbytes} 2> /dev/null\n") # ulimit -m takes kb, soft enforce process.send_data("ulimit -v 3000000 2> /dev/null\n") # virtual memory at 3G, this will be enforced process.send_data("ulimit -n #{num_fds} 2> /dev/null\n") process.send_data("ulimit -u 512 2> /dev/null\n") # processes/threads process.send_data("ulimit -f #{disk_limit} 2> /dev/null\n") # File size to complete disk usage process.send_data("umask 077\n") end
同时我们发现在非安全模式下,是没有任何限制的,droplet可以随便占满内存,用尽cpu,霸占磁盘空间...。这里解释几个参数吧,-m 是可使用最大内存,-v 最大虚拟内存,-n 一次最大的文件打开数量,-u 线程数, -f 单次打开的最大文件,不是磁盘限制。 用ulimit方式,可以简单地实现资源隔离,但是有很大的问题。 (1) kill,当进程的资源超过限制,此进程会被系统无情kill掉。会被health manager知道,health manager告知cc,cc会找个dea来启动,某种条件下,会导致无休止的重启--挂-重启--挂。
(2) 没有真正做到各种资源隔离,就像vm层面的一样。如cpu,io,namespace等。
(3) 继续添加。。。 资源隔离是多么重要,所有dea将这个功能单独移出来,放在另外的github目录上: https://github.com/cloudfoundry/warden warden是采用cgroup:http://en.wikipedia.org/wiki/Cgroups ,用vm代价太大,用LXC也是,同时CF想将wardean做到可移值。可以说cgroup是个轻量级的虚拟化。简洁,不会像vm那么复杂,同时拥有跟宿主机一样的速度。具体的介绍请看源码,warden的安装使用将参见: warden 安装 http://blog.csdn.net/k_james/article/details/8523864 warden使用与原理分析 http://blog.csdn.net/k_james/article/details/8523934 代码难点列举在CF中,eventmachine和Fiber很受欢迎。
1. EM.run
这个在第一部分也讲过,有两篇文章不错,建议看看。EM.run说明包含在内的动作都是在这个框架下执行,具有很高的并发量等好处。 [size=0.95em] #指定用epoll模式,而不是select或其它,可能在不久的将来会成为默认。 EM.epoll EM.run { agent = DEA::Agent.new(config) agent.run() }
2. EM.system
EM.run 内部,若又来个EM.system,意思是EM.defer,这段代码很核心,又要执行很久,放到后台去。如果后面还有IT论坛逻辑要做,就不能用EM.system了,简单用system。 [size=0.95em] EM.system('/bin/sh', du_proc, cont_proc)
3. EM.next_tick do
也在EM.run内部,意思是这段代码不是很着急执行,可以交给进程自己调度,有空了再执行。 [size=0.95em] EM.next_tick do unless start_file_viewer # Periodically try to start the file viewer in case of port contention @filer_start_timer = EM.add_periodic_timer(1) do if start_file_viewer EM.cancel_timer(@filer_start_timer) @filer_start_timer = nil end end end end
4. Bundler.with_clean_env
这个是说,要在ruby进程中再开个进程,但又不受ruby进程的影响,Bundler.with_clean_env会清空ENV,相当于可以在一段干净的空间里做些事情。
跟EM.system配合使用,效果更佳。 [size=0.95em] Bundler.with_clean_env { EM.system("#{@dea_ruby} -- #{prepare_script} true #{sh_command}", exec_operation, exit_operation) }
5. Fiber
这个类似线程,但不同的是它可以挂起,暂停等操作。 [size=0.95em] f = Fiber.new do ... ... r.resume r.yelid
本文到此结束,有兴趣的欢迎一起研究讨论,微博:http://weibo.com/kingjames3
来源:http://www.csdn123.com/html/mycs ... d8a3f208c98feb.html
CloudFoundry之DEA源码分析
|