diff options
Diffstat (limited to 'net/sunrpc/svcsock.c')
-rw-r--r-- | net/sunrpc/svcsock.c | 426 |
1 files changed, 296 insertions, 130 deletions
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c index 5b0fe1b66a2..b39e7e2b648 100644 --- a/net/sunrpc/svcsock.c +++ b/net/sunrpc/svcsock.c @@ -31,6 +31,7 @@ #include <linux/slab.h> #include <linux/netdevice.h> #include <linux/skbuff.h> +#include <linux/file.h> #include <net/sock.h> #include <net/checksum.h> #include <net/ip.h> @@ -45,13 +46,16 @@ /* SMP locking strategy: * - * svc_serv->sv_lock protects most stuff for that service. + * svc_pool->sp_lock protects most of the fields of that pool. + * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt. + * when both need to be taken (rare), svc_serv->sv_lock is first. + * BKL protects svc_serv->sv_nrthread. + * svc_sock->sk_defer_lock protects the svc_sock->sk_deferred list + * svc_sock->sk_flags.SK_BUSY prevents a svc_sock being enqueued multiply. * * Some flags can be set to certain values at any time * providing that certain rules are followed: * - * SK_BUSY can be set to 0 at any time. - * svc_sock_enqueue must be called afterwards * SK_CONN, SK_DATA, can be set or cleared at any time. * after a set, svc_sock_enqueue must be called. * after a clear, the socket must be read/accepted @@ -73,23 +77,30 @@ static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk); static int svc_deferred_recv(struct svc_rqst *rqstp); static struct cache_deferred_req *svc_defer(struct cache_req *req); +/* apparently the "standard" is that clients close + * idle connections after 5 minutes, servers after + * 6 minutes + * http://www.connectathon.org/talks96/nfstcp.pdf + */ +static int svc_conn_age_period = 6*60; + /* - * Queue up an idle server thread. Must have serv->sv_lock held. + * Queue up an idle server thread. Must have pool->sp_lock held. * Note: this is really a stack rather than a queue, so that we only - * use as many different threads as we need, and the rest don't polute + * use as many different threads as we need, and the rest don't pollute * the cache. */ static inline void -svc_serv_enqueue(struct svc_serv *serv, struct svc_rqst *rqstp) +svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp) { - list_add(&rqstp->rq_list, &serv->sv_threads); + list_add(&rqstp->rq_list, &pool->sp_threads); } /* - * Dequeue an nfsd thread. Must have serv->sv_lock held. + * Dequeue an nfsd thread. Must have pool->sp_lock held. */ static inline void -svc_serv_dequeue(struct svc_serv *serv, struct svc_rqst *rqstp) +svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp) { list_del(&rqstp->rq_list); } @@ -140,7 +151,9 @@ static void svc_sock_enqueue(struct svc_sock *svsk) { struct svc_serv *serv = svsk->sk_server; + struct svc_pool *pool; struct svc_rqst *rqstp; + int cpu; if (!(svsk->sk_flags & ( (1<<SK_CONN)|(1<<SK_DATA)|(1<<SK_CLOSE)|(1<<SK_DEFERRED)) )) @@ -148,10 +161,14 @@ svc_sock_enqueue(struct svc_sock *svsk) if (test_bit(SK_DEAD, &svsk->sk_flags)) return; - spin_lock_bh(&serv->sv_lock); + cpu = get_cpu(); + pool = svc_pool_for_cpu(svsk->sk_server, cpu); + put_cpu(); + + spin_lock_bh(&pool->sp_lock); - if (!list_empty(&serv->sv_threads) && - !list_empty(&serv->sv_sockets)) + if (!list_empty(&pool->sp_threads) && + !list_empty(&pool->sp_sockets)) printk(KERN_ERR "svc_sock_enqueue: threads and sockets both waiting??\n"); @@ -161,73 +178,79 @@ svc_sock_enqueue(struct svc_sock *svsk) goto out_unlock; } - if (test_bit(SK_BUSY, &svsk->sk_flags)) { - /* Don't enqueue socket while daemon is receiving */ + /* Mark socket as busy. It will remain in this state until the + * server has processed all pending data and put the socket back + * on the idle list. We update SK_BUSY atomically because + * it also guards against trying to enqueue the svc_sock twice. + */ + if (test_and_set_bit(SK_BUSY, &svsk->sk_flags)) { + /* Don't enqueue socket while already enqueued */ dprintk("svc: socket %p busy, not enqueued\n", svsk->sk_sk); goto out_unlock; } + BUG_ON(svsk->sk_pool != NULL); + svsk->sk_pool = pool; set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags); - if (((svsk->sk_reserved + serv->sv_bufsz)*2 + if (((atomic_read(&svsk->sk_reserved) + serv->sv_bufsz)*2 > svc_sock_wspace(svsk)) && !test_bit(SK_CLOSE, &svsk->sk_flags) && !test_bit(SK_CONN, &svsk->sk_flags)) { /* Don't enqueue while not enough space for reply */ dprintk("svc: socket %p no space, %d*2 > %ld, not enqueued\n", - svsk->sk_sk, svsk->sk_reserved+serv->sv_bufsz, + svsk->sk_sk, atomic_read(&svsk->sk_reserved)+serv->sv_bufsz, svc_sock_wspace(svsk)); + svsk->sk_pool = NULL; + clear_bit(SK_BUSY, &svsk->sk_flags); goto out_unlock; } clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags); - /* Mark socket as busy. It will remain in this state until the - * server has processed all pending data and put the socket back - * on the idle list. - */ - set_bit(SK_BUSY, &svsk->sk_flags); - if (!list_empty(&serv->sv_threads)) { - rqstp = list_entry(serv->sv_threads.next, + if (!list_empty(&pool->sp_threads)) { + rqstp = list_entry(pool->sp_threads.next, struct svc_rqst, rq_list); dprintk("svc: socket %p served by daemon %p\n", svsk->sk_sk, rqstp); - svc_serv_dequeue(serv, rqstp); + svc_thread_dequeue(pool, rqstp); if (rqstp->rq_sock) printk(KERN_ERR "svc_sock_enqueue: server %p, rq_sock=%p!\n", rqstp, rqstp->rq_sock); rqstp->rq_sock = svsk; - svsk->sk_inuse++; + atomic_inc(&svsk->sk_inuse); rqstp->rq_reserved = serv->sv_bufsz; - svsk->sk_reserved += rqstp->rq_reserved; + atomic_add(rqstp->rq_reserved, &svsk->sk_reserved); + BUG_ON(svsk->sk_pool != pool); wake_up(&rqstp->rq_wait); } else { dprintk("svc: socket %p put into queue\n", svsk->sk_sk); - list_add_tail(&svsk->sk_ready, &serv->sv_sockets); + list_add_tail(&svsk->sk_ready, &pool->sp_sockets); + BUG_ON(svsk->sk_pool != pool); } out_unlock: - spin_unlock_bh(&serv->sv_lock); + spin_unlock_bh(&pool->sp_lock); } /* - * Dequeue the first socket. Must be called with the serv->sv_lock held. + * Dequeue the first socket. Must be called with the pool->sp_lock held. */ static inline struct svc_sock * -svc_sock_dequeue(struct svc_serv *serv) +svc_sock_dequeue(struct svc_pool *pool) { struct svc_sock *svsk; - if (list_empty(&serv->sv_sockets)) + if (list_empty(&pool->sp_sockets)) return NULL; - svsk = list_entry(serv->sv_sockets.next, + svsk = list_entry(pool->sp_sockets.next, struct svc_sock, sk_ready); list_del_init(&svsk->sk_ready); dprintk("svc: socket %p dequeued, inuse=%d\n", - svsk->sk_sk, svsk->sk_inuse); + svsk->sk_sk, atomic_read(&svsk->sk_inuse)); return svsk; } @@ -241,6 +264,7 @@ svc_sock_dequeue(struct svc_serv *serv) static inline void svc_sock_received(struct svc_sock *svsk) { + svsk->sk_pool = NULL; clear_bit(SK_BUSY, &svsk->sk_flags); svc_sock_enqueue(svsk); } @@ -262,10 +286,8 @@ void svc_reserve(struct svc_rqst *rqstp, int space) if (space < rqstp->rq_reserved) { struct svc_sock *svsk = rqstp->rq_sock; - spin_lock_bh(&svsk->sk_server->sv_lock); - svsk->sk_reserved -= (rqstp->rq_reserved - space); + atomic_sub((rqstp->rq_reserved - space), &svsk->sk_reserved); rqstp->rq_reserved = space; - spin_unlock_bh(&svsk->sk_server->sv_lock); svc_sock_enqueue(svsk); } @@ -277,17 +299,11 @@ void svc_reserve(struct svc_rqst *rqstp, int space) static inline void svc_sock_put(struct svc_sock *svsk) { - struct svc_serv *serv = svsk->sk_server; - - spin_lock_bh(&serv->sv_lock); - if (!--(svsk->sk_inuse) && test_bit(SK_DEAD, &svsk->sk_flags)) { - spin_unlock_bh(&serv->sv_lock); + if (atomic_dec_and_test(&svsk->sk_inuse) && test_bit(SK_DEAD, &svsk->sk_flags)) { dprintk("svc: releasing dead socket\n"); sock_release(svsk->sk_sock); kfree(svsk); } - else - spin_unlock_bh(&serv->sv_lock); } static void @@ -297,7 +313,7 @@ svc_sock_release(struct svc_rqst *rqstp) svc_release_skb(rqstp); - svc_free_allpages(rqstp); + svc_free_res_pages(rqstp); rqstp->rq_res.page_len = 0; rqstp->rq_res.page_base = 0; @@ -321,25 +337,33 @@ svc_sock_release(struct svc_rqst *rqstp) /* * External function to wake up a server waiting for data + * This really only makes sense for services like lockd + * which have exactly one thread anyway. */ void svc_wake_up(struct svc_serv *serv) { struct svc_rqst *rqstp; - - spin_lock_bh(&serv->sv_lock); - if (!list_empty(&serv->sv_threads)) { - rqstp = list_entry(serv->sv_threads.next, - struct svc_rqst, - rq_list); - dprintk("svc: daemon %p woken up.\n", rqstp); - /* - svc_serv_dequeue(serv, rqstp); - rqstp->rq_sock = NULL; - */ - wake_up(&rqstp->rq_wait); + unsigned int i; + struct svc_pool *pool; + + for (i = 0; i < serv->sv_nrpools; i++) { + pool = &serv->sv_pools[i]; + + spin_lock_bh(&pool->sp_lock); + if (!list_empty(&pool->sp_threads)) { + rqstp = list_entry(pool->sp_threads.next, + struct svc_rqst, + rq_list); + dprintk("svc: daemon %p woken up.\n", rqstp); + /* + svc_thread_dequeue(pool, rqstp); + rqstp->rq_sock = NULL; + */ + wake_up(&rqstp->rq_wait); + } + spin_unlock_bh(&pool->sp_lock); } - spin_unlock_bh(&serv->sv_lock); } /* @@ -388,7 +412,8 @@ svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr) /* send head */ if (slen == xdr->head[0].iov_len) flags = 0; - len = kernel_sendpage(sock, rqstp->rq_respages[0], 0, xdr->head[0].iov_len, flags); + len = kernel_sendpage(sock, rqstp->rq_respages[0], 0, + xdr->head[0].iov_len, flags); if (len != xdr->head[0].iov_len) goto out; slen -= xdr->head[0].iov_len; @@ -413,8 +438,9 @@ svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr) } /* send tail */ if (xdr->tail[0].iov_len) { - result = kernel_sendpage(sock, rqstp->rq_respages[rqstp->rq_restailpage], - ((unsigned long)xdr->tail[0].iov_base)& (PAGE_SIZE-1), + result = kernel_sendpage(sock, rqstp->rq_respages[0], + ((unsigned long)xdr->tail[0].iov_base) + & (PAGE_SIZE-1), xdr->tail[0].iov_len, 0); if (result > 0) @@ -429,6 +455,56 @@ out: } /* + * Report socket names for nfsdfs + */ +static int one_sock_name(char *buf, struct svc_sock *svsk) +{ + int len; + + switch(svsk->sk_sk->sk_family) { + case AF_INET: + len = sprintf(buf, "ipv4 %s %u.%u.%u.%u %d\n", + svsk->sk_sk->sk_protocol==IPPROTO_UDP? + "udp" : "tcp", + NIPQUAD(inet_sk(svsk->sk_sk)->rcv_saddr), + inet_sk(svsk->sk_sk)->num); + break; + default: + len = sprintf(buf, "*unknown-%d*\n", + svsk->sk_sk->sk_family); + } + return len; +} + +int +svc_sock_names(char *buf, struct svc_serv *serv, char *toclose) +{ + struct svc_sock *svsk, *closesk = NULL; + int len = 0; + + if (!serv) + return 0; + spin_lock(&serv->sv_lock); + list_for_each_entry(svsk, &serv->sv_permsocks, sk_list) { + int onelen = one_sock_name(buf+len, svsk); + if (toclose && strcmp(toclose, buf+len) == 0) + closesk = svsk; + else + len += onelen; + } + spin_unlock(&serv->sv_lock); + if (closesk) + /* Should unregister with portmap, but you cannot + * unregister just one protocol... + */ + svc_delete_socket(closesk); + else if (toclose) + return -ENOENT; + return len; +} +EXPORT_SYMBOL(svc_sock_names); + +/* * Check input queue length */ static int @@ -557,7 +633,10 @@ svc_udp_recvfrom(struct svc_rqst *rqstp) /* udp sockets need large rcvbuf as all pending * requests are still in that buffer. sndbuf must * also be large enough that there is enough space - * for one reply per thread. + * for one reply per thread. We count all threads + * rather than threads in a particular pool, which + * provides an upper bound on the number of threads + * which will access the socket. */ svc_sock_setbufsize(svsk->sk_sock, (serv->sv_nrthreads+3) * serv->sv_bufsz, @@ -631,9 +710,11 @@ svc_udp_recvfrom(struct svc_rqst *rqstp) if (len <= rqstp->rq_arg.head[0].iov_len) { rqstp->rq_arg.head[0].iov_len = len; rqstp->rq_arg.page_len = 0; + rqstp->rq_respages = rqstp->rq_pages+1; } else { rqstp->rq_arg.page_len = len - rqstp->rq_arg.head[0].iov_len; - rqstp->rq_argused += (rqstp->rq_arg.page_len + PAGE_SIZE - 1)/ PAGE_SIZE; + rqstp->rq_respages = rqstp->rq_pages + 1 + + (rqstp->rq_arg.page_len + PAGE_SIZE - 1)/ PAGE_SIZE; } if (serv->sv_stats) @@ -844,7 +925,7 @@ svc_tcp_accept(struct svc_sock *svsk) struct svc_sock, sk_list); set_bit(SK_CLOSE, &svsk->sk_flags); - svsk->sk_inuse ++; + atomic_inc(&svsk->sk_inuse); } spin_unlock_bh(&serv->sv_lock); @@ -874,7 +955,7 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp) struct svc_sock *svsk = rqstp->rq_sock; struct svc_serv *serv = svsk->sk_server; int len; - struct kvec vec[RPCSVC_MAXPAGES]; + struct kvec *vec; int pnum, vlen; dprintk("svc: tcp_recv %p data %d conn %d close %d\n", @@ -902,6 +983,11 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp) /* sndbuf needs to have room for one request * per thread, otherwise we can stall even when the * network isn't a bottleneck. + * + * We count all threads rather than threads in a + * particular pool, which provides an upper bound + * on the number of threads which will access the socket. + * * rcvbuf just needs to be able to hold a few requests. * Normally they will be removed from the queue * as soon a a complete request arrives. @@ -967,15 +1053,17 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp) len = svsk->sk_reclen; set_bit(SK_DATA, &svsk->sk_flags); + vec = rqstp->rq_vec; vec[0] = rqstp->rq_arg.head[0]; vlen = PAGE_SIZE; pnum = 1; while (vlen < len) { - vec[pnum].iov_base = page_address(rqstp->rq_argpages[rqstp->rq_argused++]); + vec[pnum].iov_base = page_address(rqstp->rq_pages[pnum]); vec[pnum].iov_len = PAGE_SIZE; pnum++; vlen += PAGE_SIZE; } + rqstp->rq_respages = &rqstp->rq_pages[pnum]; /* Now receive data */ len = svc_recvfrom(rqstp, vec, pnum, len); @@ -1117,13 +1205,17 @@ svc_sock_update_bufs(struct svc_serv *serv) } /* - * Receive the next request on any socket. + * Receive the next request on any socket. This code is carefully + * organised not to touch any cachelines in the shared svc_serv + * structure, only cachelines in the local svc_pool. */ int -svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout) +svc_recv(struct svc_rqst *rqstp, long timeout) { struct svc_sock *svsk =NULL; - int len; + struct svc_serv *serv = rqstp->rq_server; + struct svc_pool *pool = rqstp->rq_pool; + int len, i; int pages; struct xdr_buf *arg; DECLARE_WAITQUEUE(wait, current); @@ -1140,27 +1232,22 @@ svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout) "svc_recv: service %p, wait queue active!\n", rqstp); - /* Initialize the buffers */ - /* first reclaim pages that were moved to response list */ - svc_pushback_allpages(rqstp); /* now allocate needed pages. If we get a failure, sleep briefly */ pages = 2 + (serv->sv_bufsz + PAGE_SIZE -1) / PAGE_SIZE; - while (rqstp->rq_arghi < pages) { - struct page *p = alloc_page(GFP_KERNEL); - if (!p) { - schedule_timeout_uninterruptible(msecs_to_jiffies(500)); - continue; + for (i=0; i < pages ; i++) + while (rqstp->rq_pages[i] == NULL) { + struct page *p = alloc_page(GFP_KERNEL); + if (!p) + schedule_timeout_uninterruptible(msecs_to_jiffies(500)); + rqstp->rq_pages[i] = p; } - rqstp->rq_argpages[rqstp->rq_arghi++] = p; - } /* Make arg->head point to first page and arg->pages point to rest */ arg = &rqstp->rq_arg; - arg->head[0].iov_base = page_address(rqstp->rq_argpages[0]); + arg->head[0].iov_base = page_address(rqstp->rq_pages[0]); arg->head[0].iov_len = PAGE_SIZE; - rqstp->rq_argused = 1; - arg->pages = rqstp->rq_argpages + 1; + arg->pages = rqstp->rq_pages + 1; arg->page_base = 0; /* save at least one page for response */ arg->page_len = (pages-2)*PAGE_SIZE; @@ -1172,32 +1259,15 @@ svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout) if (signalled()) return -EINTR; - spin_lock_bh(&serv->sv_lock); - if (!list_empty(&serv->sv_tempsocks)) { - svsk = list_entry(serv->sv_tempsocks.next, - struct svc_sock, sk_list); - /* apparently the "standard" is that clients close - * idle connections after 5 minutes, servers after - * 6 minutes - * http://www.connectathon.org/talks96/nfstcp.pdf - */ - if (get_seconds() - svsk->sk_lastrecv < 6*60 - || test_bit(SK_BUSY, &svsk->sk_flags)) - svsk = NULL; - } - if (svsk) { - set_bit(SK_BUSY, &svsk->sk_flags); - set_bit(SK_CLOSE, &svsk->sk_flags); - rqstp->rq_sock = svsk; - svsk->sk_inuse++; - } else if ((svsk = svc_sock_dequeue(serv)) != NULL) { + spin_lock_bh(&pool->sp_lock); + if ((svsk = svc_sock_dequeue(pool)) != NULL) { rqstp->rq_sock = svsk; - svsk->sk_inuse++; + atomic_inc(&svsk->sk_inuse); rqstp->rq_reserved = serv->sv_bufsz; - svsk->sk_reserved += rqstp->rq_reserved; + atomic_add(rqstp->rq_reserved, &svsk->sk_reserved); } else { /* No data pending. Go to sleep */ - svc_serv_enqueue(serv, rqstp); + svc_thread_enqueue(pool, rqstp); /* * We have to be able to interrupt this wait @@ -1205,26 +1275,26 @@ svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout) */ set_current_state(TASK_INTERRUPTIBLE); add_wait_queue(&rqstp->rq_wait, &wait); - spin_unlock_bh(&serv->sv_lock); + spin_unlock_bh(&pool->sp_lock); schedule_timeout(timeout); try_to_freeze(); - spin_lock_bh(&serv->sv_lock); + spin_lock_bh(&pool->sp_lock); remove_wait_queue(&rqstp->rq_wait, &wait); if (!(svsk = rqstp->rq_sock)) { - svc_serv_dequeue(serv, rqstp); - spin_unlock_bh(&serv->sv_lock); + svc_thread_dequeue(pool, rqstp); + spin_unlock_bh(&pool->sp_lock); dprintk("svc: server %p, no data yet\n", rqstp); return signalled()? -EINTR : -EAGAIN; } } - spin_unlock_bh(&serv->sv_lock); + spin_unlock_bh(&pool->sp_lock); - dprintk("svc: server %p, socket %p, inuse=%d\n", - rqstp, svsk, svsk->sk_inuse); + dprintk("svc: server %p, pool %u, socket %p, inuse=%d\n", + rqstp, pool->sp_id, svsk, atomic_read(&svsk->sk_inuse)); len = svsk->sk_recvfrom(rqstp); dprintk("svc: got len=%d\n", len); @@ -1235,13 +1305,7 @@ svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout) return -EAGAIN; } svsk->sk_lastrecv = get_seconds(); - if (test_bit(SK_TEMP, &svsk->sk_flags)) { - /* push active sockets to end of list */ - spin_lock_bh(&serv->sv_lock); - if (!list_empty(&svsk->sk_list)) - list_move_tail(&svsk->sk_list, &serv->sv_tempsocks); - spin_unlock_bh(&serv->sv_lock); - } + clear_bit(SK_OLD, &svsk->sk_flags); rqstp->rq_secure = ntohs(rqstp->rq_addr.sin_port) < 1024; rqstp->rq_chandle.defer = svc_defer; @@ -1301,6 +1365,58 @@ svc_send(struct svc_rqst *rqstp) } /* + * Timer function to close old temporary sockets, using + * a mark-and-sweep algorithm. + */ +static void +svc_age_temp_sockets(unsigned long closure) +{ + struct svc_serv *serv = (struct svc_serv *)closure; + struct svc_sock *svsk; + struct list_head *le, *next; + LIST_HEAD(to_be_aged); + + dprintk("svc_age_temp_sockets\n"); + + if (!spin_trylock_bh(&serv->sv_lock)) { + /* busy, try again 1 sec later */ + dprintk("svc_age_temp_sockets: busy\n"); + mod_timer(&serv->sv_temptimer, jiffies + HZ); + return; + } + + list_for_each_safe(le, next, &serv->sv_tempsocks) { + svsk = list_entry(le, struct svc_sock, sk_list); + + if (!test_and_set_bit(SK_OLD, &svsk->sk_flags)) + continue; + if (atomic_read(&svsk->sk_inuse) || test_bit(SK_BUSY, &svsk->sk_flags)) + continue; + atomic_inc(&svsk->sk_inuse); + list_move(le, &to_be_aged); + set_bit(SK_CLOSE, &svsk->sk_flags); + set_bit(SK_DETACHED, &svsk->sk_flags); + } + spin_unlock_bh(&serv->sv_lock); + + while (!list_empty(&to_be_aged)) { + le = to_be_aged.next; + /* fiddling the sk_list node is safe 'cos we're SK_DETACHED */ + list_del_init(le); + svsk = list_entry(le, struct svc_sock, sk_list); + + dprintk("queuing svsk %p for closing, %lu seconds old\n", + svsk, get_seconds() - svsk->sk_lastrecv); + + /* a thread will dequeue and close it soon */ + svc_sock_enqueue(svsk); + svc_sock_put(svsk); + } + + mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ); +} + +/* * Initialize socket for RPC use and create svc_sock struct * XXX: May want to setsockopt SO_SNDBUF and SO_RCVBUF. */ @@ -1337,7 +1453,9 @@ svc_setup_socket(struct svc_serv *serv, struct socket *sock, svsk->sk_odata = inet->sk_data_ready; svsk->sk_owspace = inet->sk_write_space; svsk->sk_server = serv; + atomic_set(&svsk->sk_inuse, 0); svsk->sk_lastrecv = get_seconds(); + spin_lock_init(&svsk->sk_defer_lock); INIT_LIST_HEAD(&svsk->sk_deferred); INIT_LIST_HEAD(&svsk->sk_ready); mutex_init(&svsk->sk_mutex); @@ -1353,6 +1471,13 @@ svc_setup_socket(struct svc_serv *serv, struct socket *sock, set_bit(SK_TEMP, &svsk->sk_flags); list_add(&svsk->sk_list, &serv->sv_tempsocks); serv->sv_tmpcnt++; + if (serv->sv_temptimer.function == NULL) { + /* setup timer to age temp sockets */ + setup_timer(&serv->sv_temptimer, svc_age_temp_sockets, + (unsigned long)serv); + mod_timer(&serv->sv_temptimer, + jiffies + svc_conn_age_period * HZ); + } } else { clear_bit(SK_TEMP, &svsk->sk_flags); list_add(&svsk->sk_list, &serv->sv_permsocks); @@ -1367,6 +1492,38 @@ svc_setup_socket(struct svc_serv *serv, struct socket *sock, return svsk; } +int svc_addsock(struct svc_serv *serv, + int fd, + char *name_return, + int *proto) +{ + int err = 0; + struct socket *so = sockfd_lookup(fd, &err); + struct svc_sock *svsk = NULL; + + if (!so) + return err; + if (so->sk->sk_family != AF_INET) + err = -EAFNOSUPPORT; + else if (so->sk->sk_protocol != IPPROTO_TCP && + so->sk->sk_protocol != IPPROTO_UDP) + err = -EPROTONOSUPPORT; + else if (so->state > SS_UNCONNECTED) + err = -EISCONN; + else { + svsk = svc_setup_socket(serv, so, &err, 1); + if (svsk) + err = 0; + } + if (err) { + sockfd_put(so); + return err; + } + if (proto) *proto = so->sk->sk_protocol; + return one_sock_name(name_return, svsk); +} +EXPORT_SYMBOL_GPL(svc_addsock); + /* * Create socket for RPC service. */ @@ -1434,15 +1591,27 @@ svc_delete_socket(struct svc_sock *svsk) spin_lock_bh(&serv->sv_lock); - list_del_init(&svsk->sk_list); - list_del_init(&svsk->sk_ready); + if (!test_and_set_bit(SK_DETACHED, &svsk->sk_flags)) + list_del_init(&svsk->sk_list); + /* + * We used to delete the svc_sock from whichever list + * it's sk_ready node was on, but we don't actually + * need to. This is because the only time we're called + * while still attached to a queue, the queue itself + * is about to be destroyed (in svc_destroy). + */ if (!test_and_set_bit(SK_DEAD, &svsk->sk_flags)) if (test_bit(SK_TEMP, &svsk->sk_flags)) serv->sv_tmpcnt--; - if (!svsk->sk_inuse) { + if (!atomic_read(&svsk->sk_inuse)) { spin_unlock_bh(&serv->sv_lock); - sock_release(svsk->sk_sock); + if (svsk->sk_sock->file) + sockfd_put(svsk->sk_sock); + else + sock_release(svsk->sk_sock); + if (svsk->sk_info_authunix != NULL) + svcauth_unix_info_release(svsk->sk_info_authunix); kfree(svsk); } else { spin_unlock_bh(&serv->sv_lock); @@ -1473,7 +1642,6 @@ svc_makesock(struct svc_serv *serv, int protocol, unsigned short port) static void svc_revisit(struct cache_deferred_req *dreq, int too_many) { struct svc_deferred_req *dr = container_of(dreq, struct svc_deferred_req, handle); - struct svc_serv *serv = dreq->owner; struct svc_sock *svsk; if (too_many) { @@ -1484,9 +1652,9 @@ static void svc_revisit(struct cache_deferred_req *dreq, int too_many) dprintk("revisit queued\n"); svsk = dr->svsk; dr->svsk = NULL; - spin_lock_bh(&serv->sv_lock); + spin_lock_bh(&svsk->sk_defer_lock); list_add(&dr->handle.recent, &svsk->sk_deferred); - spin_unlock_bh(&serv->sv_lock); + spin_unlock_bh(&svsk->sk_defer_lock); set_bit(SK_DEFERRED, &svsk->sk_flags); svc_sock_enqueue(svsk); svc_sock_put(svsk); @@ -1518,10 +1686,8 @@ svc_defer(struct cache_req *req) dr->argslen = rqstp->rq_arg.len >> 2; memcpy(dr->args, rqstp->rq_arg.head[0].iov_base-skip, dr->argslen<<2); } - spin_lock_bh(&rqstp->rq_server->sv_lock); - rqstp->rq_sock->sk_inuse++; + atomic_inc(&rqstp->rq_sock->sk_inuse); dr->svsk = rqstp->rq_sock; - spin_unlock_bh(&rqstp->rq_server->sv_lock); dr->handle.revisit = svc_revisit; return &dr->handle; @@ -1541,6 +1707,7 @@ static int svc_deferred_recv(struct svc_rqst *rqstp) rqstp->rq_prot = dr->prot; rqstp->rq_addr = dr->addr; rqstp->rq_daddr = dr->daddr; + rqstp->rq_respages = rqstp->rq_pages; return dr->argslen<<2; } @@ -1548,11 +1715,10 @@ static int svc_deferred_recv(struct svc_rqst *rqstp) static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk) { struct svc_deferred_req *dr = NULL; - struct svc_serv *serv = svsk->sk_server; if (!test_bit(SK_DEFERRED, &svsk->sk_flags)) return NULL; - spin_lock_bh(&serv->sv_lock); + spin_lock_bh(&svsk->sk_defer_lock); clear_bit(SK_DEFERRED, &svsk->sk_flags); if (!list_empty(&svsk->sk_deferred)) { dr = list_entry(svsk->sk_deferred.next, @@ -1561,6 +1727,6 @@ static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk) list_del_init(&dr->handle.recent); set_bit(SK_DEFERRED, &svsk->sk_flags); } - spin_unlock_bh(&serv->sv_lock); + spin_unlock_bh(&svsk->sk_defer_lock); return dr; } |