asyncore.py 简单使用

asyncore.py是用于处理异步soket的python标准库,典型的reactor设计,我们只需要实现handle_*的方法,handle_writer, handle_read, handle_connect 等方法。它的底层实现select/poll,我们调用时,面向socket的开发,与协议无关,所以实现http, memcache, spdy等都可以,甚至在unix下还有 async file i/o的子类。

asyncore.py内容分成三部分:
1. socket_map={}属性
2. def loop()方法
3. class dispatcher基类

其中socket_map和loop是全局的属性和方法,dispatcher类把我们的socket保存在socket_map中,socket_map的k,v为socket.fileno()和dispatcher,dispatcher包含socket属性。我们的loop方法轮询这个map根据子类实现的handle_writable/handle_readable的标志调用对应的handle_write/handle_read方法。结束后调用close()方法关闭链接。

    def close(self):
        self.connected = False
        self.accepting = False
        self.del_channel()
        try:
            self.socket.close()
        except socket.error, why:
            if why.args[0] not in (ENOTCONN, EBADF):
                raise

在docs.python.org上的最后是使用的例子:第一个是http 1.0客户端的实现

import asyncore, socket

class HTTPClient(asyncore.dispatcher):

    def __init__(self, host, path):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM) #建立socket
        self.connect( (host, 80) ) #链接 http://www.python.org:80
        self.buffer = ‘GET %s HTTP/1.0\r\n\r\n’ % path #实现http协议,需要发送的数据

    def handle_connect(self):
        pass

    def handle_close(self):
        self.close()

    def handle_read(self):
        print self.recv(8192) #接受数据,内部实现socket.recv(),与socket.send()配合使用

    def writable(self):
        return (len(self.buffer) > 0) #发送数据的条件,当buffer为空时调用handle_close()

    def handle_write(self):
        sent = self.send(self.buffer) #发送数据
        self.buffer = self.buffer[sent:] #清空发送,buffer

client = HTTPClient(‘www.python.org’, ‘/’)
asyncore.loop()

上面没有实现handle_readable,默认发送后3s开始接收。
Technorati Tags: ,

java.util.LinkedHashMap 实现LRU Cache

java.util.LinkedHashMap 是一个有趣的类,它用HashMap和LinkedList的方式实现了Map接口。entries是一个非阻塞的双向环形链表,在读写entry的时候会修改entries list的顺序,每次把操作的entry放到list的head上,这样整个list的顺序就是按照最近最少访问,到最近最多访问的顺序排列。

下面是entry的实现,是整个LinkedHashMap的核心

    /**
     * LinkedHashMap entry.
     */
    private static class Entry<K,V> extends HashMap.Entry<K,V> {
        // These fields comprise the doubly linked list used for iteration.
        Entry<K,V> before, after; //双向引用

        Entry(int hash, K key, V value, HashMap.Entry<K,V> next) {
            super(hash, key, value, next);
        }

        /**
         * Removes this entry from the linked list.
         */
        private void remove() {
            before.after = after; after.before = before;
        }

        /**
         * Inserts this entry before the specified existing entry in the list.
         */
        private void addBefore(Entry<K,V> existingEntry) {
            after  = existingEntry; before = existingEntry.before;
            before.after = this; after.before = this;
        }

        /**
         * This method is invoked by the superclass whenever the value
         * of a pre-existing entry is read by Map.get or modified by Map.set.
         * If the enclosing Map is access-ordered, it moves the entry
         * to the end of the list; otherwise, it does nothing.
         */
        void recordAccess(HashMap<K,V> m) {//访问enties的方法切面
            LinkedHashMap<K,V> lm = (LinkedHashMap<K,V>)m;
            if (lm.accessOrder) { //是否需要进行LRU
                lm.modCount++; //修改链表次数,因为这个操作在非阻塞情况下进行,在做遍历时需要根据这个次数判断是否遭到修改
                remove();//把自己从队列中移除
                addBefore(lm.header);//放到最前面
            }
        }

        void recordRemoval(HashMap<K,V> m) {
            remove();
        }
    }

java.util.LinkedHashMap.removeEldestEntry(Entry<K, V>) 这个protected方法可以让我们重载,帮助我们删除最少被访问的entry,这样就可以实现一个空间固定的cache。

    protected boolean removeEldestEntry(Map.Entry<K,V> eldest) {
        return false;
    }

重载后,只要实现这段代码其实你的LRU Cache就完成了:

            @Override
            protected boolean removeEldestEntry(final Map.Entry<K, V> eldest) {
                return this.size() > LRUCache.this.maxCacheSize;
            }

虽然他叫remove其实并没有remove的职责,这点请注意。remove的操作在LinkedHashMap在其他地方已经实现了。

    /**
     * This override alters behavior of superclass put method. It causes newly
     * allocated entry to get inserted at the end of the linked list and
     * removes the eldest entry if appropriate.
     */
    void addEntry(int hash, K key, V value, int bucketIndex) {
        createEntry(hash, key, value, bucketIndex);

        // Remove eldest entry if instructed, else grow capacity if appropriate
        Entry<K,V> eldest = header.after; //因为是个环形
        if (removeEldestEntry(eldest)) { //每次增加新数据的时候都会判断是不是满了。
            removeEntryForKey(eldest.key);
        } else {
            if (size >= threshold)
                resize(2 * table.length);
        }
    }

一个完成LRU Cache的例子:http://www.source-code.biz/snippets/java/6.htm
这个本地的LRU Cache配合上一篇日志的按照用户标志拆分web server,实现提高本地缓存命中率的目的。

用nginx实现按照用户标志hash拆分web server

大型网站的数据存储拆分中比较通用的一种方式是按照用户id的hash值进行水平拆分,这种做法可以有效的较低db的读写压力,增加数据IO处理能力。

为了满足稳定性的要求,一般的web server都不会只有一台,并且大部分的web server都是按照uri/url进行业务拆分,这样可以增加web server cpu,men等资源的使用效率。可是如果能在web server上可以进一步按照userid hash拆分的话,就可以适当的增加web server的内存级别的缓存,可以有效的利用web server的内存使用效率,并且提高访问数据,增加web server网络的io吞吐率。

HttpUpstreamConsistentHash 是基于nginx的一个模块,需要在编译的时候安装进去

./configure –prefix=/home/nginx   –with-pcre=/tmp/pcre-8.01 –with-http_stub_status_module –with-http_ssl_module –without-http_rewrite_module –add-module=/tmp/nginx_upstream_hash-0.3 –add-module=/tmp/replay-ngx_http_consistent_hash-77b6940
make && make install

这个模块的作用是根据的uri或cookie等信息的hash code 对后端的机器进行分发请求:

upstream somestream {
consistent_hash $request_uri;
server 10.50.1.3:11211;
server 10.50.1.4:11211;
server 10.50.1.5:11211;
}

上面是nginx doc上的例子,根据uri进行负载均衡。
如果需要根据用户的身份进行负载可以根据“身份cookie”这个元素写到配置文件中,绝大部分的大型网站都不会使用session这个功能去保存用户登陆状态,而是利用cookie中的kv存储去记录用户信息。我们则可以利用这个用户身份的COOKIE去完成我们的需求。

upstream somestream {
consistent_hash $COOKIE_ticket;
server 10.50.1.3:8080;
server 10.50.1.4:8081;
server 10.50.1.5:8082;
}

虽然我们不知道用户散到了哪台web server上面,但是我们运行一段时间,就可以把访问这台机器的用户信息cache到内存中,这样之后用户再次访问时就减少了部分的rpc调用,增加了我们的运行效率。