X-Git-Url: https://git.dogcows.com/gitweb?p=chaz%2Fchatty;a=blobdiff_plain;f=extra%2Fcometd%2FMeteor%2FConnection.pm;fp=extra%2Fcometd%2FMeteor%2FConnection.pm;h=19dd95923b394301e3721a75c56c7e3edbc97dca;hp=0000000000000000000000000000000000000000;hb=100d54b49cab3783276b3a470fffa5e509929daf;hpb=06da6ad7294f8293cfe3a5e77e0f676d2884cd79 diff --git a/extra/cometd/Meteor/Connection.pm b/extra/cometd/Meteor/Connection.pm new file mode 100644 index 0000000..19dd959 --- /dev/null +++ b/extra/cometd/Meteor/Connection.pm @@ -0,0 +1,312 @@ +#!/usr/bin/perl -w +############################################################################### +# Meteor +# An HTTP server for the 2.0 web +# Copyright (c) 2006 contributing authors +# +# Subscriber.pm +# +# Description: +# Common super-class for controller and subscriber +# +############################################################################### +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the Free +# Software Foundation; either version 2 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# For more information visit www.meteorserver.org +# +############################################################################### + +package Meteor::Connection; +############################################################################### +# Configuration +############################################################################### + + use strict; + + use Errno qw(EAGAIN); + + our $MAX_READ_SIZE=8192; + our $CONNECTION_WRITE_TIMEOUT=120; + + our @Connections=(); + +############################################################################### +# Class methods +############################################################################### +sub addAllHandleBits { + my $class=shift; + + my $rVecRef=shift; + my $wVecRef=shift; + my $eVecRef=shift; + + my @cons=@Connections; + map {$_->addHandleBits($rVecRef,$wVecRef,$eVecRef) if(defined($_)) } @cons; +} + +sub checkAllHandleBits { + my $class=shift; + + my $rVec=shift; + my $wVec=shift; + my $eVec=shift; + + my @cons=@Connections; + map {$_->checkHandleBits($rVec,$wVec,$eVec) if(defined($_)) } @cons; +} + +sub connectionCount { + scalar(@Connections); +} + +sub closeAllConnections { + my @cons=@Connections; + + map { $_->close(); } @cons; +} + +############################################################################### +# Factory methods +############################################################################### +sub new { + # + # Create a new empty instance + # + my $class=shift; + + my $obj={}; + + bless($obj,$class); +} + +sub newFromServer { + # + # new instance from new server connection + # + my $self=shift->new(); + + $::Statistics->{'total_requests'}++; + + my $server=shift; + my $socket=$server->conSocket(); + + $self->{'socket'}=$socket; + $self->{'socketFN'}=$socket->fileno(); + + $socket->setNonBlocking(); + + $self->{'writeBuffer'}=''; + $self->{'readBuffer'}=''; + $self->{'bytesWritten'}=0; + $self->{'ip'}=$socket->{'connection'}->{'remoteIP'}; + + push(@Connections,$self); + + &::syslog('debug',"New %s for %s",ref($self),$socket->{'connection'}->{'remoteIP'}); + + $self; +} + +############################################################################### +# Instance methods +############################################################################### +sub write { + my $self=shift; + + $self->{'writeBuffer'}.=shift; + $self->{'writeBufferTimestamp'}=time unless(exists($self->{'writeBufferTimestamp'})); +} + +sub addHandleBits { + my $self=shift; + + my $rVecRef=shift; + my $wVecRef=shift; + my $eVecRef=shift; + + my $fno=$self->{'socketFN'}; + + if($self->{'writeBuffer'} ne '') + { + if(exists($self->{'writeBufferTimestamp'}) && $self->{'writeBufferTimestamp'}+$CONNECTION_WRITE_TIMEOUT{'socket'}->{'connection'}->{'remoteIP'}); + + $self->{'writeBuffer'}=''; + $self->close(); + return; + } + vec($$wVecRef,$fno,1)=1; + } + + vec($$rVecRef,$fno,1)=1; + vec($$eVecRef,$fno,1)=1; +} + +sub checkHandleBits { + my $self=shift; + + my $rVec=shift; + my $wVec=shift; + my $eVec=shift; + + my $fno=$self->{'socketFN'}; + + if(vec($eVec,$fno,1)) + { + # + # Something went wrong! + # + $self->exceptionReceived(); + + return; + } + + if(vec($rVec,$fno,1)) + { + # + # Data available for read + # + my $socket=$self->{'socket'}; + + my $buffer=''; + my $bytesRead=sysread($socket->{'handle'},$buffer,$MAX_READ_SIZE); + if(defined($bytesRead) && $bytesRead>0) + { + $::Statistics->{'total_inbound_bytes'}+=$bytesRead; + $self->{'readBuffer'}.=$buffer; + while($self->{'readBuffer'}=~s/^([^\r\n]*)\r?\n//) + { + $self->processLine($1); + } + } + elsif(defined($bytesRead) && $bytesRead==0) + { + # Connection closed + $self->{'remoteClosed'}=1; + $self->close(1, 'remoteClosed'); + + return; + } + else + { + unless(${!}==EAGAIN) + { + &::syslog('notice',"Connection closed: $!"); + $self->{'remoteClosed'}=1; + $self->close(1, 'remoteClosed'); + + return; + } + } + } + + if(vec($wVec,$fno,1) && $self->{'writeBuffer'} ne '') + { + # + # Can write + # + my $socket=$self->{'socket'}; + + my $bytesWritten=syswrite($socket->{'handle'},$self->{'writeBuffer'}); + + if(defined($bytesWritten) && $bytesWritten>0) + { + $::Statistics->{'total_outbound_bytes'}+=$bytesWritten; + $self->{'bytesWritten'}+=$bytesWritten; + $self->{'writeBuffer'}=substr($self->{'writeBuffer'},$bytesWritten); + if(length($self->{'writeBuffer'})==0) + { + delete($self->{'writeBufferTimestamp'}); + $self->close(1) if(exists($self->{'autoClose'})); + } + else + { + $self->{'writeBufferTimestamp'}=time; + } + } + else + { + unless(${!}==EAGAIN) + { + &::syslog('notice',"Connection closed: $!"); + $self->{'remoteClosed'}=1; + $self->close(1, 'remoteClosed'); + + return; + } + } + } +} + +sub exceptionReceived { + my $self=shift; + + $self->{'writeBuffer'}=''; + + $self->close(); +} + +sub close { + my $self=shift; + + #&::syslog('debug',"Close called for %s for %s when write buffer empty",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'}); + + unless($self->{'remoteClosed'}) + { + if(!exists($self->{'autoClose'}) && length($self->{'writeBuffer'})>0) + { + $self->{'autoClose'}=1; + + &::syslog('debug',"Will close %s for %s when write buffer empty",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'}); + + return; + } + } + + eval { + $self->{'socket'}->close(); + }; + + # + # Remove connection from list of connections + # + my $idx=undef; + my $numcon = scalar(@Connections); + for(my $i=0;$i<$numcon;$i++) + { + if($Connections[$i]==$self) + { + $idx=$i; + last; + } + } + + if(defined($idx)) + { + splice(@Connections,$idx,1); + } + + &::syslog('debug',"Closed %s for %s",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'}); + + $self->didClose(); +} + +sub didClose { +} + +1; +############################################################################EOF \ No newline at end of file