Archive for May, 2013

作者:AngryFox 分类: Uncategorized May 31st, 2013 暂无评论

1.把图片用二进制存入数据库中
MYSQL 是支持把图片存入数据库的,也相应的有一个专门的字段 BLOB (Binary Large Object),即较大的二进制对象
还有个更大的存二进制的LONGBLOB;

#!/usr/local/bin/python2.7
#-*- coding: UTF-8 -*-

import MySQLdb as mdb
import sys 

try:
    fin = open("./web.jpg")
    img = fin.read()
    fin.close()

except IOError,e:
    print "Meet Error %d %s" % (e.args[0],e.args[1])
    sys.exit(1)

try:
    conn = mdb.connect(host='localhost',user='angryfox',passwd='ofwho',db='test')
    cursor = conn.cursor()
    cursor.execute("INSERT INTO images SET data='%s'" % mdb.escape_string(img))
    #conn.commit()

    cursor.close()
    conn.close()

except mdb.Error,e:
    print "Meet Error %d %s" % (e.args[0],e.args[1])
    sys.exit(1)
作者:AngryFox 分类: Uncategorized May 31st, 2013 暂无评论

proxy_cache和fastcgi_cache构成了Nginx的缓存,proxy_cache主要用于反向代理时,对后端内容源服务器进行缓存,可能是任何内容,包括静态的和动态,缓存减少了nginx与后端通信的次数,节省了传输时间和后端宽带;fastcgi_cache主要用于对FastCGI的动态程序进行缓存,很多情况是php生成的动态的内容,fastcgi_cache缓存减少了nginx与php的通信的次数,更减轻了php和数据库(mysql)的压力,这比用memcached之类的缓存要轻松得多。。两者的功能基本上一样。

在功能上,Nginx已经具备Squid所拥有的Web缓存加速功能、清除指定URL缓存的功能。而在性能上,Nginx对多核CPU的利用,胜过Squid不少。另外,在反向代理、负载均衡、健康检查、后端服务器故障转移、Rewrite重写、易用性上,Nginx也比Squid强大得多。这使得一台Nginx可以同时作为“负载均衡服务器”与“Web缓存服务器”来使用。

所以可以根据实际情况结合使用proxy_cache和fastcgi_cache来架构Nginx的负载均衡系统。

FastCGI 的技术原理就要了解何为”短生存期应用程序”,何为”长生存期应用程序”。先从 CGI 技术开刀,以下是 CGI 技术的理论:每次当客户请求一个 CGI 的时候,Web 服务器就请求操作系统生成一个新的 CGI 进程。当 CGI 满足要求后,服务器就杀死这个进程。服务器对客户端的每个请求都要重复这样的过程。而 FastCGI 技术的理论为:FastCGI 程序一旦产生后,他可以持续工作,足够满足客户的请求直到被明确的终止。如果你希望通过协同处理来提高程序的性能,你可以请求 Web 服务器运行多个 FastCGI应用程序的副本。CGI 就是所谓的短生存期应用程序,FastCGI 就是所谓的长生存期应用程序。由于 FastCGI 程序并不需要不断的产生新进程,可以大大降低服务器的压力。并且产生较高的应用效率。
很多时候一个页面由多个数据片断组成,为了提高页面速度,要么分别缓存,要么整体缓存(所谓的Page Cache)
关于Nginx fastcgi_cache,基础的可以参看Nginx官方文档http://wiki.nginx.org/HttpFcgiModule,下面是一个典型的做法是:

fastcgi_temp_path /data/ngx_fcgi_tmp;
fastcgi_cache_path /data/ngx_fcgi_cache levels=2:2 keys_zone=ngx_fcgi_cache:512m inactive=1d max_size=40g;
fastcgi_cache_valid 200 301 302 1d;
fastcgi_cache_use_stale error timeout invalid_header http_500;
fastcgi_cache_key http://$host$request_uri;

大概解释下各个参数的含义:

  fastcgi_temp_path:生成fastcgi_cache临时文件目录,fastcgi_cache_path:fastcgi_cache缓存目录,可以设置目录哈希层级,比如2:2会生成256*256个字目录,keys_zone是这个缓存空间的名字,cache是用多少内存(这样热门的内容nginx直接放内存,提高访问速度),inactive表示默认失效时间,max_size表示最多用多少硬盘空间,需要注意的是fastcgi_cache缓存是先写在fastcgi_temp_path再移到fastcgi_cache_path,所以这两个目录最好在同一个分区,从0.8.9之后可以在不同的分区,不过还是建议放同一分区。
  fastcgi_cache_valid:定义哪些http头要缓存,fastcgi_cache_use_stale:定义哪些情况下用过期缓存
fastcgi_cache_key:定义fastcgi_cache的key,示例中就以请求的URI作为缓存的key,Nginx会取这个key的md5作为缓存文件,如果设置了缓存哈希目录,Nginx会从后往前取相应的位数做为目录。
  fastcgi_cache:用哪个缓存空间
如何手动清除缓存?有个Nginx的第三方扩展可帮你做到:https://github.com/FRiCKLE/ngx_cache_purge/
直接找到缓存文件,然后删掉就可以;
Nginx fastcgi_cache缓存很不错,但我只想在某些页面用fastcgi_cache,很简单,有两种方法,一是在location中定义fastcgi_cache,这样只有满足一定规则的url才会用上cache,其他的就不会了;另外一种方法是在你不需要缓存的页面上,输出禁止缓存的头信息,用ColaPHP的话,直接$this->response->disableBrowserCache(); 具体代码:

header("Expires: Mon, 26 Jul 1997 05:00:00 GMT");
header("Last-Modified: " . gmdate("D, d M Y H:i:s") . " GMT");
header("Cache-Control: no-store, no-cache, must-revalidate");
header("Cache-Control: post-check=0, pre-check=0", false);
header("Pragma: no-cache");

大部分都是Memcache、Xcache、Proxy_Cache,FastCgi Cache很少采用
配置方法很简单
1、新建文件夹存放缓存文件
mkdir /home/cache/fcgi -p (www的权限)
2、修改nginx.conf文件(LNMP一键包目录在/usr/local/nginx/conf/nginx.conf),在http层加入:
fastcgi_cache_path /home/cache/fcgi levels=1:2 keys_zone=fcgi:15m inactive=1d max_size=1g;
#15m为内存占用 1g为硬盘最大占用空间
3、修改位于vhost目录下的站点文件

		location ~ .*\.(php|php5)?$
			{
				#try_files $uri =404;
				fastcgi_pass  unix:/tmp/php-cgi.sock;
				#fastcgi_index index.php;
				include fcgi.conf;
				#fastcgi_pass   127.0.0.1:9000;
				fastcgi_index  index.php;
				fastcgi_param  SCRIPT_FILENAME  /home/wwwroot/$fastcgi_script_name;  #网站请求的根目录
				include        fastcgi_params;
				fastcgi_cache fcgi;
				fastcgi_cache_valid 200 302 301 1h;
				fastcgi_cache_valid any 1m;
				fastcgi_cache_min_uses 1;
				fastcgi_cache_use_stale error timeout invalid_header http_500;
				fastcgi_cache_key $request_method:$host$request_uri;
			}
/etc/init.d/nginx restart

5、打开站点,看缓存目录是否增大
du -sh /home/cache/fcgi

作者:AngryFox 分类: Uncategorized May 30th, 2013 暂无评论

Linux系统网络服务器模型主要有两种:并发服务器和循环服务器。所谓并发服务器就是在同一个时刻可以处理来自多个客户端的请求;循环服务器是指服务器在同一时刻指可以响应一个客户端的请求。而且对于TCP和UDP套接字,这两种服务器的实现方式也有不同的特点。
1、TCP循环服务器:首先TCP服务器接受一个客户端的连接请求,处理连接请求,在完成这个客户端的所有请求后断开连接,然后再接受下一个客户端的请求。
创建TCP循环服务器的算法如下:
socket(……); //创建一个TCP套接字
bind(……); //邦定公认的端口号
listen(……); //倾听客户端连接
while(1) //开始循环接收客户端连接
{
accept(……);//接收当前客户端的连接
while(1)
{ //处理当前客户端的请求
read(……);
process(……);
write(……);
}
close(……); //关闭当前客户端的连接,准备接收下一个客户端连接
}
TCP循环服务器一次只处理一个客户端的请求,如果有一个客户端占用服务器不放时,其它的客户机连接请求都得不到及时的响应。因此,TCP服务器一般很少用循环服务器模型的。
2、TCP并发服务器:并发服务器的思想是每一个客户端的请求并不由服务器的主进程直接处理,而是服务器主进程创建一个子进程来处理。
创建TCP并发服务器的算法如下:
socket(……); //创建一个TCP套接字
bind(……); //邦定公认的端口号
listen(……);//倾听客户端连接
while(1) //开始循环接收客户端的接收
{
accept(……);//接收一个客户端的连接
if(fork(……)==0) //创建子进程
{
while(1)
{ //子进程处理某个客户端的连接
read(……);
process(……);
write(……);
}
close(……); //关闭子进程处理的客户端连接
exit(……) ;//终止该子进程
}
close(……); //父进程关闭连接套接字描述符,准备接收下一个客户端连接
}
TCP并发服务器可以解决TCP循环服务器客户端独占服务器的情况。但同时也带来了一个不小的问题,即响应客户机的请求,服务器要创建子进程来处理,而创建子进程是一种非常消耗资源的操作。
3、UDP循环服务器:UDP服务器每次从套接字上读取一个客户端的数据报请求,处理接收到的UDP数据报,然后将结果返回给客户机。
创建UDP循环服务器的算法如下:
socket(……); //创建一个数据报类型的套接字
bind(……); //邦定公认的短口号
while(1) //开始接收客户端的连接
{ //接收和处理客户端的UDP数据报
recvfrom(……);
process(……);
sendto(……);
//准备接收下一个客户机的数据报
}
因为UDP是非面向连接的,没有一个客户端可以独占服务器。只要处理过程不是死循环,服务器对于每一个客户机的请求总是能够处理的。
UDP循环服务器在数据报流量过大时由于处理任务繁重可能造成客户技数据报丢失,但是因为UDP协议本身不保证数据报可靠到达,所以UDP协议是尤许丢失数据报的。
鉴于以上两点,一般的UDP服务器采用循环方式
4、UDP并发服务器
把并发的概念应用UDP就得到了并发UDP服务器,和并发TCP服务器模型一样是创建子进程来处理的。
创建UDP并发服务器的算法如下:
socket(……); //创建一个数据报类型的套接字
bind(……); //邦定公认的短口号
while(1) //开始接收客户端的连接
{ //接收和处理客户端的UDP数据报
recvfrom(……);
if(fork(……)==0) //创建子进程
{
process(……);
sendto(……);
}
}
除非服务器在处理客户端的请求所用的时间比较长以外,人们实际上很少用这种UDP并发服务器模型的。
5、多路复用I/O并发服务器:创建子进程会带来系统资源的大量消耗,为了解决这个问题,采用多路复用I/O模型的并发服务器。采用select函数创建多路复用I/O模型的并发服务器的算法如下:
初始化(socket,bind,listen);
while(1)
{
设置监听读写文件描述符(FD_*);
调用select;
如果是倾听套接字就绪,说明一个新的连接请求建立
{
建立连接(accept);
加入到监听文件描述符中去;
}
否则说明是一个已经连接过的描述符
{
进行操作(read或者write);
}
}
多路复用I/O可以解决资源限制问题,此模型实际上是将UDP循环模型用在了TCP上面。这也会带了一些问题,如由于服务器依次处理客户的请求,所以可能导致友的客户会等待很久。

作者:AngryFox 分类: Uncategorized May 29th, 2013 暂无评论

1)api所使用协议,要能够不怕被加js、加广告和各种代码。
全国各地的运营商,有习惯挂广告、加代码等等,要做的事情就是在客户端做各种解析判断。实践证明,这帮孙子的设备是检测http协议的head头里的content-type,如果是html/text则会加,如果是application/json.就不加了,所以,你懂的。

2)api可视化
你的app上架了,后端也布好了。然后呢,就天天看下载量。但后端好坏一问三不知。所以在设计api时,要提前回答以下问题:
1.此时此刻,有多少个app正在调用这些api,每分钟多少个?
2.他们(api)有快?
3.能否很简单地通过浏览器快速debug?
4.能否快速禁止单个用户?

3)链路
在幅员辽阔的我国,总是一个地方到一个地方的IDC直接就ping不通(移动互联网下更甚)。于是需要花钱自己打通这些地方,没有专线,只有代理再代理。

4)api quota与perfcounter
对api进行quota限制,针对每个api每个人,都有限额。
这个限制的传统做法:
1.每天的上限调用次数。每天半夜清空,一天一个量累计。
2.每分钟的频度。这个防止有恶性的突发情况。
上述二者,缺一不可。
perfcounter用于对api的监控设计指标,如果一个api有异常,应当十分灵敏地得出结论,而不是误报连连。

作者:AngryFox 分类: Uncategorized May 29th, 2013 暂无评论
# -*- encoding: utf-8 -*-
'''
示例代码中实现了两个类:生产者类Producer和消费者类Consumer。前者在一个随机的时间内放入一个值到队列queue中然后显示出来,
后者在一定随机的时间内从队列queue中取出一个值并显示出来。
'''
from Queue import Queue
import threading
import random
import time

# Producer thread
class Producer(threading.Thread):
	def __init__(self, threadname, queue):
		threading.Thread.__init__(self, name = threadname)
		self.q = queue
	def run(self):
		for i in list(xrange(20)):
			print self.getName(),'adding',i,'to queue'
			self.q.put(i)
			time.sleep(random.randrange(10)/10.0)
			print self.getName(),'Finished'

# Consumer thread
class Consumer(threading.Thread):
	def __init__(self, threadname, queue):
		threading.Thread.__init__(self, name = threadname)
		self.q = queue
	def run(self):
		for i in list(xrange(25)):
			if not self.q.empty():
				print self.getName(),'got a value:',self.q.get()
			else:
				print self.getName(),'no value to get'
			time.sleep(random.randrange(10)/10.0)
			print self.getName(),'Finished'

# Main thread
def run():
	queue = Queue()
	producer = Producer('p', queue)
	consumer = Consumer('c', queue)

	print 'Starting threads ...'
	producer.start()
	consumer.start()

	producer.join()
	consumer.join()
	print 'All threads have terminated.'
'''
阻塞进程直到线程执行完毕。通用的做法是我们启动一批线程,最后join这些线程结束;
join的原理就是依次检验线程池中的线程是否结束,没有结束就阻塞直到线程结束,如果结束则跳转执行下一个线程的join函数
而py的join函数还有一个特殊的功能就是可以设置超时
阻塞指的是暂停一个线程的执行以等待某个条件发生(如某资源就绪)
在线程start的时候进行join操作,那么主线程会等待其jion的所有线程,直到线程都退出后才有往下执行,如果线程中有死循环那么此主线程会一直堵塞,所有要注意容错处理
'''

if __name__ == '__main__':
	run()
作者:AngryFox 分类: Uncategorized May 28th, 2013 暂无评论

考虑到Python的全局解释器锁(Global Interceptor Lock, GIL)。由于GIL的存在,在CPU密集型的程序当中,使用多线程并不能有效地利用多核CPU的优势,因为一个解释器在同一时刻只会有一个线程在执行。要想尽可能利用多核CPU并发,多进程是必需的。
引入多进程就带来几个问题:首先是这种设计方案需要一个类似于UNIX Pipe的底层结构,说的更确切一点就是message queue,操作类似于Python的Queue.Queue。另一个问题就是需要一个进程间信息共享的基础设施。

先说Queue,这个Queue需要满足下面一些需求,优先级递减:

轻量 – 不需要依赖数据库等比较重型的程序。代价就是丢点东西没关系;
跨进程 – 读端和写端可以不在一个进程当中;
支持多进多出 – 读端和写端都要支持多个进程;
稳定和高效 – 这个就不用多说了吧;
有Python的API,再不济要有C的API可以让我写Python的binding。

另外是进程间共享的设施,系统shm是迫不得已使用的,因为要自己处理Pickling和Unpickling,以及一些杂七杂八的竞争条件问题。有建构在上面的现成的库最好。出于轻量的考虑,选定了Python的multiprocessing库,而没有用IPC MQ 或 RabbitMQ

import pickle
data = {'foo':{1,2,3},'bar':True}
jar = open('data.pkl','wb')
pickle.dump(data, jar)
jar.close()

import pickle
jar_file  = open('data.pkl','rb')
data = pickle.load(jar_file)
print data
在mp库当中,跨进程对象共享有三种方式,第一种仅适用于原生机器类型,即python.ctypes当中的类型,这种在mp库的文档当中称为shared memory方式,即通过共享内存共享对象;另外一种称之为server process,即有一个服务器进程负责维护所有的对象,而其他进程连接到该进程,通过代理对象操作服务器进程当中的对象;最后一种在mp文档当中没有单独提出,但是在其中多次提到,而且是mp库当中最重要的一种共享方式,称为inheritance,即继承,对象在父进程当中创建,然后在父进程是通过multiprocessing.Process创建子进程之后,子进程自动继承了父进程当中的对象,并且子进程对这些对象的操作都是反映到了同一个对象。

这三者共享方式各有特色,在这里进行一些简单的比较。

首先是共享方式所应对的对象类型,看这个表:
共享方式 	支持的类型
Shared memory 	ctypes当中的类型,通过RawValue,RawArray等包装类提供
Inheritance 	系统内核对象,以及基于这些对象实现的对象。包括Pipe, Queue, JoinableQueue, 同步对象(Semaphore, Lock, RLock, Condition, Event等等)
Server process 	所有对象,可能需要自己手工提供代理对象(Proxy)

这个表总结了三种不同的共享方式所支持的类型,下面一个个展开讨论。

其中最单纯简单的就是shared memory这种方式,只有ctypes当中的数据类型可以通过这种方式共享。由于mp库本身缺少命名的机制,即在一个进程当中创建的对象,无法在另外一个进程当中通过名字来引用,因此,这种共享方式依赖于继承,对象应该由父进程创建,然后由子进程引用。关于这种机制的例子,可以参见Python文档当中的例子 Synchronization types like locks, conditions and queues,参考其中的test_sharedvalues函数。

然后是继承方式。首先关于继承方式需要有说明,继承本质上并不是一种对象共享的机制,对象共享只是其副作用。子进程从父进程继承来的对象并不一定是共享的。继承本质上是父进程fork出的子进程自动继承父进程的内存状态和对象描述符。因此,实际上子进程复制了一份父进程的对象,只不过,当这个对象包装了一些系统内核对象的描述符的时候,拷贝这个对象(及其包装的描述符)实现了对象的共享。因此,在上面的表当中,只有系统内核对象,和基于这些对象实现的对象,才能够通过继承来共享。通过继承共享的对象在linux平台上没有任何限制,但是在Windows上面由于没有fork的实现,因此有一些额外的限制条件,因此,在Windows上面,继承方式是几乎无法用的。

最后就是Server Process这种方式。这种方式可以支持的类型比另外两种都多,因为其模型是这样的:
server process模型

server process模型

在这个模型当中,有一个manager进程,负责管理实际的对象。真正的对象也是在manager进程的内存空间当中。所有需要访问该对象的进程都需要先连接到该管理进程,然后获取到对象的一个代理对象(Proxy object),通常情况下,这个代理对象提供了实际对象的公共函数的代理,将函数参数进行pickle,然后通过连接传送到管理进程当中,管理进程将参数unpickle之后,转发给相应的实际对象的函数,返回值(或者异常)同样经过管理进程pickle之后,通过连接传回到客户进程,再由proxy对象进行unpickle,返回给调用者或者抛出异常。

很明显,这个模型是一个典型的RPC(远程过程调用)的模型。因为每个客户进程实际上都是在访问manager进程当中的对象,因此完全可以通过这个实现对象共享。

manager和proxy之间的连接可以是基于socket的网络连接,也可以是unix pipe。如果是使用基于socket的连接方式,在使用proxy之前,需要调用manager对象的connect函数与远程的manager进程建立连接。由于manager进程会打开端口接收该连接,因此必要的身份验证是需要的,否则任何人都可以连上manager弄乱你的共享对象。mp库通过authkey的方式来进行身份验证。

在实现当中,manager进程通过multiprocessing.Manager类或者BaseManager的子类实现。BaseManager提供了函数register注册一个函数来获取共享对象的proxy。这个函数会被客户进程调用,然后在manager进程当中执行。这个函数可以返回一个共享的对象(对所有的调用返回同一个对象),或者可以为每一个调用创建一个新的对象,通过前者就可以实现多个进程共享一个对象。关于这个的用法可以参考Python文档当中的例子“Demonstration of how to create and use customized managers and proxies”。

典型的导出一个共享对象的代码是:

ObjectType object_
class ObjectManager(multiprocessing.managers.BaseManager): pass
ObjectManager.register("object", lambda: object_)

注意上面介绍proxy对象的时候,我提到的“公共函数”四个字。每个proxy对象只会导出实际对象的公共函数。这里面有两个含义,一个是“公共”,即所有非下划线开头的成员,另一个是“函数”,即所有callable的成员。这就带来一些限制,一是无法导出属性,二是无法导出一些公共的特殊函数,例如__get__, __next__等等。对于这个mp库有一套处理,即自定义proxy对象。首先是BaseManager的register可以提供一个proxy_type作为第三个参数,这个参数指定了哪些成员需要被导出。详细的使用方法可以参见文档当中的第一个例子。

另外manager还有一些细节的问题需要注意。由于Proxy对象不是线程安全的,因此如果需要在一个多线程程序当中使用proxy,mp库会为每个线程创建一个proxy对象,而每个proxy对象都会对server process创建一个连接,而manager那边对于每个连接都创建一个单独的线程来为其服务。这样带来的问题就是,如果客户进程有很多线程,很容易会导致manager进程的fd数目达到ulimit的限制,即使没有达到限制,也会因为manager进程当中有太多线程而严重影响manager的性能。解决方案可以是一个进程内cache,只有一个单独的线程可以创建proxy对象访问共享对象,其余线程只能访问该进程当中的cache。

一旦manager因为达到ulimit限制或者其他异常,manager会直接退出,遗憾的是,这时候已经建立的proxy会试图重新连接manager – 但是它已经不存在了。这个会导致客户进程hang在对proxy的函数调用上,这个时候,目前除了杀掉进程没有找到别的办法。

另外proxy使用socket的方式比较tricky,因此和内置的socket库有很多冲突,比如socket.setdefaulttimeout(Python Issue 6056 )。在setdefaulttimeout调用了之后,进程当中所有通过socket模块建立的socket都是被设置为unblock模式的,但是mp库并不知道这一点,而且它总是假设socket都是block模式的,于是,一旦调用了setdefaulttimeout,所有对于proxy的函数调用都会抛出OSError,错误代码为11,错误原因是非常有误导性的“Resource temporarily unavailable”,实际上就是EAGAIN。这个错误可以通过我提供的一个patch来补救(这个patch当中还包含其他的一些修复,所以请自行查看并修改该patch)。

由于以上的一些原因,server process模式作为一个对象的共享模式,能够提供最为灵活的共享方式,但是也有最多的问题。这个在使用过程当中就靠自己去衡量了。目前我们的系统对于数据可靠性方面要求不高,丢失数据是可以接受的,但是也只用这种模式来维护统计值,不敢用来维护更多的东西。
jar_file.close()

multiprocessing的目的是创建一个接口和python.threading类似接口的库,用多进程的方式来并发处理

import multiprocessing

def dosomething(a,b,c):
    print a,b,c

p = multiprocessing.Process(target=dosomething,args=(1,2,3))
p.start()
print 1
p.join()

创建的这个新的进程在*nix上面使用的是fork,这意味着子进程开始执行的时候具有与父进程相同的全部内容。所谓基于继承的对象共享,是说在创建子进程之前由父进程初始化的某些对象可以在子进程当中直接访问到。在Windows平台上,因为没有fork语义的系统调用,基于继承的共享对象比*nix有更多的限制,最主要就是体现在要求Process的__init__当中的参数必须可以Pickle。但是,并不是所有的对象都是可以通过继承来共享,只有multiprocessing库当中的某些对象才可以。例如Queue,同步对象,共享变量,Manager等等。
在一个multiprocessing库的典型使用场景下,所有的子进程都是由一个父进程启动起来的,这个父进程称为master进程。这个父进程非常重要,它会管理一系列的对象状态,一旦这个进程退出,子进程很可能会处于一个很不稳定的状态,因为它们共享的状态也许已经被损坏掉了。因此,这个进程最好尽可能做最少的事情,以便保持其稳定性。

 

在mp库当中,跨进程对象共享有三种方式,第一种仅适用于原生机器类型,即python.ctypes当中的类型,这种在mp库的文档当中称为shared memory方式,即通过共享内存共享对象;另外一种称之为server process,即有一个服务器进程负责维护所有的对象,而其他进程连接到该进程,通过代理对象操作服务器进程当中的对象;最后一种在mp文档当中没有单独提出,但是在其中多次提到,而且是mp库当中最重要的一种共享方式,称为inheritance,即继承,对象在父进程当中创建,然后在父进程是通过multiprocessing.Process创建子进程之后,子进程自动继承了父进程当中的对象,并且子进程对这些对象的操作都是反映到了同一个对象。

这三者共享方式各有特色,在这里进行一些简单的比较。

首先是共享方式所应对的对象类型,看这个表:

共享方式 支持的类型
Shared memory ctypes当中的类型,通过RawValue,RawArray等包装类提供
Inheritance 系统内核对象,以及基于这些对象实现的对象。包括Pipe, Queue, JoinableQueue, 同步对象(Semaphore, Lock, RLock, Condition, Event等等)
Server process 所有对象,可能需要自己手工提供代理对象(Proxy)

这个表总结了三种不同的共享方式所支持的类型,下面一个个展开讨论。

其中最单纯简单的就是shared memory这种方式,只有ctypes当中的数据类型可以通过这种方式共享。由于mp库本身缺少命名的机制,即在一个进程当中创建的对象,无法在另外一 个进程当中通过名字来引用,因此,这种共享方式依赖于继承,对象应该由父进程创建,然后由子进程引用。关于这种机制的例子,可以参见Python文档当中的例子 Synchronization types like locks, conditions and queues,参考其中的test_sharedvalues函数。

然后是继承方式。首先关于继承方式需要有说明,继承本质上并不是一种对象共享的机制,对象共享只是其副作用。子进程从父进程继承来的对象并不一定是共享的。继承本质上是父进程fork出的子进程自动继承父进程的内存状态和对象描述符。因此,实际上子进程复制了 一份父进程的对象,只不过,当这个对象包装了一些系统内核对象的描述符的时候,拷贝这个对象(及其包装的描述符)实现了对象的共享。因此,在上面的表当 中,只有系统内核对象,和基于这些对象实现的对象,才能够通过继承来共享。通过继承共享的对象在linux平台上没有任何限制,但是在Windows上面 由于没有fork的实现,因此有一些额外的限制条件,因此,在Windows上面,继承方式是几乎无法用的。

最后就是Server Process这种方式。这种方式可以支持的类型比另外两种都多,因为其模型是这样的:

server process模型server process模型

在这个模型当中,有一个manager进程,负责管理实际的对象。真正的对象也是在manager进程的内存空间当中。所有需要访问该对象的进程都 需要先连接到该管理进程,然后获取到对象的一个代理对象(Proxy object),通常情况下,这个代理对象提供了实际对象的公共函数的 代理,将函数参数进行pickle,然后通过连接传送到管理进程当中,管理进程将参数unpickle之后,转发给相应的实际对象的函数,返回值(或者异 常)同样经过管理进程pickle之后,通过连接传回到客户进程,再由proxy对象进行unpickle,返回给调用者或者抛出异常。

很明显,这个模型是一个典型的RPC(远程过程调用)的模型。因为每个客户进程实际上都是在访问manager进程当中的对象,因此完全可以通过这个实现对象共享。

manager和proxy之间的连接可以是基于socket的网络连接,也可以是unix pipe。如果是使用基于socket的连接方式,在使用proxy之前,需要调用manager对象的connect函数与远程的manager进程建 立连接。由于manager进程会打开端口接收该连接,因此必要的身份验证是需要的,否则任何人都可以连上manager弄乱你的共享对象。mp库通过 authkey的方式来进行身份验证。

在实现当中,manager进程通过multiprocessing.Manager类或者BaseManager的子类实现。 BaseManager提供了函数register注册一个函数来获取共享对象的proxy。这个函数会被客户进程调用,然后在manager进程当中执 行。这个函数可以返回一个共享的对象(对所有的调用返回同一个对象),或者可以为每一个调用创建一个新的对象,通过前者就可以实现多个进程共享一个对象。 关于这个的用法可以参考Python文档当中的例子“Demonstration of how to create and use customized managers and proxies”。

典型的导出一个共享对象的代码是:

ObjectType object_
class ObjectManager(multiprocessing.managers.BaseManager): pass
ObjectManager.register("object", lambda: object_)

注意上面介绍proxy对象的时候,我提到的“公共函数”四个字。每个proxy对象只会导出实际对象的公共函数。这里面有两个含义,一个是“公 共”,即所有非下划线开头的成员,另一个是“函数”,即所有callable的成员。这就带来一些限制,一是无法导出属性,二是无法导出一些公共的特殊函 数,例如__get__, __next__等等。对于这个mp库有一套处理,即自定义proxy对象。首先是BaseManager的register可以提供一个 proxy_type作为第三个参数,这个参数指定了哪些成员需要被导出。详细的使用方法可以参见文档当中的第一个例子。

另外manager还有一些细节的问题需要注意。由于Proxy对象不是线程安全的,因此如果需要在一个多线程程序当中使用proxy,mp库会为 每个线程创建一个proxy对象,而每个proxy对象都会对server process创建一个连接,而manager那边对于每个连接都创建一个单独的线程来为其服务。这样带来的问题就是,如果客户进程有很多线程,很容易会 导致manager进程的fd数目达到ulimit的限制,即使没有达到限制,也会因为manager进程当中有太多线程而严重影响manager的性 能。解决方案可以是一个进程内cache,只有一个单独的线程可以创建proxy对象访问共享对象,其余线程只能访问该进程当中的cache。

一旦manager因为达到ulimit限制或者其他异常,manager会直接退出,遗憾的是,这时候已经建立的proxy会试图重新连接 manager – 但是它已经不存在了。这个会导致客户进程hang在对proxy的函数调用上,这个时候,目前除了杀掉进程没有找到别的办法。

另外proxy使用socket的方式比较tricky,因此和内置的socket库有很多冲突,比如socket.setdefaulttimeout(Python Issue 6056 )。在setdefaulttimeout调用了之后,进程当中所有通过socket模块建立的socket都是被设置为unblock模式的,但是mp 库并不知道这一点,而且它总是假设socket都是block模式的,于是,一旦调用了setdefaulttimeout,所有对于proxy的函数调 用都会抛出OSError,错误代码为11,错误原因是非常有误导性的“Resource temporarily unavailable”,实际上就是EAGAIN。这个错误可以通过我提供的一个patch来补救(这个patch当中还包含其他的一些修复,所以请自行查看并修改该patch)。

由于以上的一些原因,server process模式作为一个对象的共享模式,能够提供最为灵活的共享方式,但是也有最多的问题。这个在使用过程当中就靠自己去衡量了。目前我们的系统对于 数据可靠性方面要求不高,丢失数据是可以接受的,但是也只用这种模式来维护统计值,不敢用来维护更多的东西。

继续讨论Python multiprocessing,这次讨论的主要内容是mp库的核心组件之一的Queue。

Queue是mp库当中用来提供多进程对象交换的方式。对象交换和上一部分当中提到的对象共享都是使多个进程访问同一个对象的方式,两者的区别就是,对象共享是多个进程访问同一个对象,对象交换则是将对象从一个进程传输的另一个进程。

multiprocessing当中的Queue使用方式和Python内置的threading.Queue对象很像,它支持一个put操作,将 对象放入Queue,也支持一个get操作,将对象从Queue当中读出。和threading.Queue不同的是,mp.Queue默认不支持 join()和task_done操作,这两个支持需要使用mp.JoinableQueue对象。

由于Queue对象负责进程之间的对象传输,因此第一个问题就是如何在两个进程之间共享这个Queue对象本身。在上一部分所言的三种共享方式当 中,Queue对象只能使用继承(inheritance)的方式共享。这是因为Queue本身基于unix的Pipe对象实现,而Pipe对象的共享需 要通过继承。因此,在一个典型的应用实现模型当中,应该是父进程创建Queue,然后创建子进程共享该Queue,由父进程和子进程分别读写。例如下面的 这个例子:

import multiprocessing

q = multiprocessing.Queue()

def reader_proc():
    print q.get()

reader = multiprocessing.Process(target=reader_proc)
reader.start()

q.put(100)
reader.join()

另一种实现方式是父进程创建Queue,创建多个子进程,有的子进程读Queue,有的子进程写Queue,例如:

import multiprocessing

q = multiprocessing.Queue()

def writer_proc():
    q.put(100)

def reader_proc():
    print q.get()

reader = multiprocessing.Process(target=reader_proc)
reader.start()
writer = multiprocessing.Process(target=writer_proc)
writer.start()

reader.join()
writer.join()

由于使用继承的方式共享Queue,因此代码当中并没有明显的传输Queue对象本身的代码,看起来似乎只要将multiprocessing当中 的对象换成threading当中的对象,程序仍然能够工作。反之,拿到一个现有的多线程程序,是不是将threading改成 multiprocessing就可以工作呢?也许可以,但是更可能的情况是你会遇到很多问题。

第一个问题就是mp的Queue需要考虑多进程之间的对象传输,因此所传输的对象必须是可以pickle的。否则,在Queue的put操作上会抛出PicklingError。

其他的一些差异表现在一些技术细节上,这些不是任何高层逻辑可以抽象掉的,不知道这些差异会导致一些潜在的错误,例如死锁。在总结这些潜在的犯错的 可能的同时,我们会简单看一下mp当中Queue的实现方式,以便能够方便的理解为什么会有这样的行为。这些实现问题仅仅针对Linux,Windows 上面的实现和出现的问题在这里不涉及。

mp.Queue建构在系统的Pipe之上,但是实际上进程并不是直接将对象写入到Pipe里面,而是先写入一个本地的buffer,再由一个专门 的feed线程将其放入Pipe当中。读取端则是直接从Pipe当中读出对象。之所以有这样一个feed线程,是为了能够提供Queue接口函数所需要的 put的超时控制。但是由于这个feed线程的存在,mp.Queue提供了几个额外的函数来控制它,一个函数close来停止该线程,以及 join_thread来join该线程。close同时负责把所有在buffer当中的对象刷新到Pipe当中。

但是这个feed线程也是个麻烦制造者,为了保证所有被放入Queue的东西最终都能够到达另外一端的进程,mp库注册了一个atexit的处理函 数,用来在进程退出的时候自动close并且join该feed线程。这个join动作带来了很多问题,比如潜在的死锁。考虑下面一种状况:一个父进程创 建了两个子进程,一个子进程读,另一个子进程写。当需要停止这些进程的时候,父进程如果先把读进程结束,但是同时写进程已经将太多的对象写入Queue, 导致后继的对象等待在buffer当中,则这个进程将无法终止,因为atexit的处理函数等待把所有buffer当中的对象放入Pipe,但是Pipe 已经满了,然后陷入了死锁。

有人可能会问,那只要保证总是按照数据流的顺序来停止进程不就行。问题是在很多复杂的系统流程当中,可能存在一个环形的数据流,这种情况下,无论按照什么顺序停止进程,终究有一个进程可能陷入这种情景当中。

幸运的是,Queue对象还提供了一个成员函数cancel_join_thread,这个函数可以使得在进程停止的时候不进行join操作,这样 可以避免死锁,代价就是这个时候尚未刷新到Pipe当中的对象都会丢失。鉴于即使调用了join_thread,残留在Pipe当中的对象仍然可能丢失, 所以一旦选择使用mp的Queue对象,就不要假设不会在流程当中丢对象了。

另外一个可能的方案是使用mp库当中的SimpleQueue对象。这个对象在文档当中没有提及,但是在multiprocessing.queue模块当中有定义。这个对象就是去掉了buffer的Queue对象,因此可能能够避免上面说的问题的。但是SimpleQueue没有提供put和get的超时处理,两个动作都是阻塞的。

除了使用multiprocessing.Queue,还可以使用multiprocessing.Pipe进行通信。mp.Pipe是Queue 的底层结构,但是没有feed线程和put/get的超时控制。一定程度上和SimpleQueue很像。需要注意的是Pipe带有一个参数 duplex,当设置为True(默认)的时候,Pipe并不是使用系统的pipe来实现,而是通过socketpair,即Unix Domain Socket来实现。这个和pipe相比有些微的性能差异。

另外一个使用Queue的方式不是mp库内置的。这种方式使用上一篇文章当中提到的server process的方式来共享一个Queue对象。这个Queue对象实际上在server process当中,所有的子进程通过socket连接到server process获取该Queue的代理对象进行操作。说到这有人会想起来mp库有一个内置的SyncManager对象, 可以通过multiprocess.Manager函数获取到,通过该对象的Queue方法可以获取一个Queue的代理对象。不幸的是,这个方法不是正 确的获取Queue的方式,原因正如上一篇文章所说,SyncManager.Queue方法的每次调用获取到的是一个新建对象的代理对象,而不是一个共 享对象。正确的使用server process当中的Queue的方式是:

共同部分:

import multiprocessing.managers as mpm
import Queue

class SharedQueueManager(mpm.BaseManager): pass
q = Queue.Queue()
SharedQueueManager.register('Queue', lambda: q)

服务进程:

mgr = SharedQueueManager(address=('', 12345))
server = mgr.get_server()
server.serve_forever()

客户进程:

mgr = SharedQueueManager(address=('localhost', 12345))
mgr.connect()
q = mgr.Queue() # 这里q就是共享的Queue对象的代理对象

这种方式比起mp库内置的Queue,有一些性能上的影响,因为毕竟牵涉到多次网络通讯,但是带来的好处是没有feed线程带来的一系列问题,而且 理论上不会存在丢数据的问题,除非server process崩溃。但是正如上一篇所说,server process本身就不是很靠谱的,因此这里也只是“理论上”不会丢数据而已。

说到性能,这里就列两个性能数据,以前在twitter上面提到过的(这两个连接无法访问的请联系我):

操作对象为 pickle后512字节的对象,通过proxy操作Queue的性能大约是7000次/秒(本机)或1100次/秒(多机),如果使用 multiprocessing.Queue,效率可达54000次/秒。

连接:

http://blog.ftofficer.com/2009/12/python-multiprocessing-2-object-sharing-across-process/

http://blog.ftofficer.com/2009/12/python-multiprocessing-3-about-queue/

作者:AngryFox 分类: Uncategorized May 27th, 2013 暂无评论
#coding: utf-8
import socket
import struct

def ip2int( ip ):
    #unpack !L 就是从network中unpack数据
    return struct.unpack('!L',socket.inet_aton(ip))[0]

if __name__ == '__main__':
    print ip2int( '192.168.100.1' )
    print ip2int( '192.168.100.21' )
    print ip2int( '192.168.100.200' )

"""
ping 3232261141
"""
作者:AngryFox 分类: Uncategorized May 27th, 2013 暂无评论
<?php
$fname = './FUCKYOURMOTHER.mp3';
$fp = fopen($fname,'rb');
$fsize = filesize($fname);
if (isset($_SERVER['HTTP_RANGE']) && ($_SERVER['HTTP_RANGE'] != "") &&
	preg_match("/^bytes=([0-9]+)-$/i", $_SERVER['HTTP_RANGE'], $match) && ($match[1] < $fsize)) {
		$start = $match[1];
	} else {
		$start = 0;
	}
	@header("Cache-control: public");
	@header("Pragma: public"); if ($star--> 0) {
	fseek($fp, $start);
	Header("HTTP/1.1 206 Partial Content");
	Header("Content-Length: " . ($fsize - $start));
	Header("Content-Ranges: bytes" . $start . "-" . ($fsize - 1) . "/" . $fsize);
} else {
	header("Content-Length: $fsize");
	Header("Accept-Ranges: bytes");
}
@header("Content-Type: application/octet-stream");
@header("Content-Disposition: attachment;filename=mmdld.mp3");
fpassthru($fp);
fpassthru();//函数输出文件指针处的所有剩余数据。
作者:AngryFox 分类: Uncategorized May 27th, 2013 暂无评论

1、轮询(默认)

每个请求按时间顺序逐一分配到不同的后端服务器,如果后端服务器down掉,能自动剔除。

2、weight
指定轮询几率,weight和访问比率成正比,用于后端服务器性能不均的情况。
例如:

upstream backend {
server 192.168.0.14 weight=10;
server 192.168.0.15 weight=10;
}

3、ip_hash
每个请求按访问ip的hash结果分配,这样每个访客固定访问一个后端服务器,可以解决session的问题。
例如:
upstream backend {
ip_hash;
server 192.168.0.14:88;
server 192.168.0.15:80;
}

4、fair(第三方)
按后端服务器的响应时间来分配请求,响应时间短的优先分配。

upstream backend {
server server1.linuxany.com;
server server2.linuxany.com;
fair;
}
5、url_hash(第三方)

按访问url的hash结果来分配请求,使每个url定向到同一个后端服务器,后端服务器为缓存时比较有效。

例:在upstream中加入hash语句,server语句中不能写入weight等其他的参数,hash_method是使用的hash算法

upstream backend {
server squid1:3128;
server squid2:3128;
hash $request_uri;
hash_method crc32;
}
#定义负载均衡设备的Ip及设备状态

upstream backend{
ip_hash;
server 127.0.0.1:9090 down;
server 127.0.0.1:8080 weight=2;
server 127.0.0.1:6060;
server 127.0.0.1:7070 backup;
}
在需要使用负载均衡的server中增加
proxy_pass http://bakend/;

每个设备的状态设置为:
1.down 表示单前的server暂时不参与负载
2.weight 默认为1.weight越大,负载的权重就越大。
3.max_fails :允许请求失败的次数默认为1.当超过最大次数时,返回proxy_next_upstream 模块定义的错误
4.fail_timeout:max_fails次失败后,暂停的时间。
5.backup: 其它所有的非backup机器down或者忙的时候,请求backup机器。所以这台机器压力会最轻。

nginx支持同时设置多组的负载均衡,用来给不用的server来使用。

client_body_in_file_only 设置为On 可以讲client post过来的数据记录到文件中用来做debug
client_body_temp_path 设置记录文件的目录 可以设置最多3层目录

location 对URL进行匹配.可以进行重定向或者进行新的代理 负载均衡

作者:AngryFox 分类: Uncategorized May 27th, 2013 暂无评论
function A(){
    var name = 'A';// 私有,不会被继承
    this.age = 12; // 公有,会被继承
    function aa(){}// 私有
    this.aa = function(){} // 公有
}
function B(){}
B.prototype = new A();
b = new B();
console.dir(b);

JS面向对象编程之对象访问控制实例

function Cat(name, age){
    var name = name;
    this.age = age||1;
    this.setName = function(sName){
        this.name = sName;
    }
    this.getName = function(){
        return this.name;
    }
    this.setAge = function(nAge){
        this.age = nAge;
    }
    this.getAge = function(){
        return this.age;
    }
}

Cat.prototype.say = function(){
    console.log('I am '+this.name+', I am '+this.age);
}

var kk = new Cat('kk');
console.log(kk.name);//undefined @private
kk.setName('deeka');
console.log(kk.getName()); // deeka
kk.setAge(2);
console.log(kk.getAge()); // 2
kk.age = 3;
console.log(kk.age);//3 @public
kk.say(); // I am deeka, I am 3