X-Git-Url: https://git.dogcows.com/gitweb?p=chaz%2Fchatty;a=blobdiff_plain;f=extra%2Fcometd%2FMeteor%2FSubscriber.pm;fp=extra%2Fcometd%2FMeteor%2FSubscriber.pm;h=cb6269b00f1914b53cdd0e9ef73d54dae8a5e563;hp=0000000000000000000000000000000000000000;hb=100d54b49cab3783276b3a470fffa5e509929daf;hpb=06da6ad7294f8293cfe3a5e77e0f676d2884cd79 diff --git a/extra/cometd/Meteor/Subscriber.pm b/extra/cometd/Meteor/Subscriber.pm new file mode 100644 index 0000000..cb6269b --- /dev/null +++ b/extra/cometd/Meteor/Subscriber.pm @@ -0,0 +1,372 @@ +#!/usr/bin/perl -w +############################################################################### +# Meteor +# An HTTP server for the 2.0 web +# Copyright (c) 2006 contributing authors +# +# Subscriber.pm +# +# Description: +# A Meteor Subscriber +# +############################################################################### +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the Free +# Software Foundation; either version 2 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# For more information visit www.meteorserver.org +# +############################################################################### + +package Meteor::Subscriber; +############################################################################### +# Configuration +############################################################################### + + use strict; + + use Meteor::Connection; + use Meteor::Channel; + use Meteor::Document; + + @Meteor::Subscriber::ISA=qw(Meteor::Connection); + + our %PersistentConnections=(); + our $NumAcceptedConnections=0; + + +############################################################################### +# Factory methods +############################################################################### +sub newFromServer { + my $class=shift; + + my $self=$class->SUPER::newFromServer(shift); + + $self->{'headerBuffer'}=''; + $self->{'MessageCount'}=0; + $self->{'MaxMessageCount'}=0; + + $self->{'ConnectionStart'}=time; + my $maxTime=$::CONF{'MaxTime'}; + if($maxTime>0) + { + $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime; + } + + $::Statistics->{'current_subscribers'}++; + $::Statistics->{'subscriber_connections_accepted'}++; + + $self; +} + +############################################################################### +# Class methods +############################################################################### +sub deleteSubscriberWithID { + my $class=shift; + my $id=shift; + + if(exists($PersistentConnections{$id})) + { + $PersistentConnections{$id}->close(0,'newSubscriberWithSameID'); + } +} + +sub pingPersistentConnections { + my $class=shift; + + my @cons=values %PersistentConnections; + + map { $_->ping() } @cons; +} + +sub checkPersistentConnectionsForMaxTime { + my $class=shift; + + my $time=time; + my @cons=values %PersistentConnections; + + map { $_->checkForMaxTime($time) } @cons; +} + +sub numSubscribers { + + return scalar(keys %PersistentConnections); +} + +############################################################################### +# Instance methods +############################################################################### +sub processLine { + my $self=shift; + my $line=shift; + + # Once the header was processed we ignore any input + return unless(exists($self->{'headerBuffer'})); + + if($line ne '') + { + # + # Accumulate header + # + $self->{'headerBuffer'}.="$line\n"; + } + else + { + # + # Empty line signals end of header. + # Analyze header, register with appropiate channel + # and send pending messages. + # + # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1 + # + # Find the 'GET' line + # + if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/([a-z0-9_\-\%\.\/]+).*?/i) + { + $self->{'subscriberID'}=$1; + $self->{'mode'}=$2; + my $persist=$self->getConf('Persist'); + my $maxTime=$self->getConf('MaxTime'); + $self->{'ConnectionTimeLimit'} = ($self->{'ConnectionStart'}+$maxTime) if ($maxTime>0); + + my @channelData=split('/',$3); + my $channels={}; + my $channelName; + my $offset; + foreach my $chandef (@channelData) { + if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) { + $channelName = $1; + $channels->{$channelName}->{'startIndex'} = undef; + if ($3) { + $offset = $4; + if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; } + if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; } + if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; } + } + } + } + my $useragent = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-"; + + delete($self->{'headerBuffer'}); + + if ($persist) { + $self->deleteSubscriberWithID($self->{'subscriberID'}); + $PersistentConnections{$self->{'subscriberID'}}=$self; + } + + if(scalar(keys %{$channels})) { + + $self->{'channelinfo'} = ''; + my $citemplate = $self->getConf('ChannelInfoTemplate'); + foreach $channelName (keys %{$channels}) { + my $channel=Meteor::Channel->channelWithName($channelName); + $self->{'channels'}->{$channelName}=$channel; + $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate); + + } + $self->emitOKHeader(); + foreach $channelName (keys %{$channels}) { + my $startIndex=$channels->{$channelName}->{'startIndex'}; + $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$useragent); + } + delete ($self->{'channels'}) unless($persist); + $self->close(1, 'responseComplete') unless($persist); + return; + } + } + elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/) + { + $self->deleteSubscriberWithID($1); + $self->emitOKHeader(); + $self->close(1, 'disconnectRequested'); + return; + } + elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/) + { + Meteor::Document->serveFileToClient($1,$self); + $self->close(1, 'responseComplete'); + return; + } + + # + # If we fall through we did not understand the request + # + $self->emitErrorHeader(); + } +} + +sub emitOKHeader { + my $self=shift; + + $self->emitHeader('200 OK'); +} + +sub emitErrorHeader { + my $self=shift; + + $self->emitHeader('404 Not Found'); + $::Statistics->{'errors_served'}++; + + # close up shop here! + $self->close(0, 'error'); +} + +sub emitHeader { + my $self=shift; + my $status=shift; + + my $header=$self->getConf('HeaderTemplate'); + + $header=~s/~([^~]*)~/ + if(!defined($1) || $1 eq '') { + '~'; + } elsif($1 eq 'server') { + $::PGM; + } elsif($1 eq 'status') { + $status; + } elsif($1 eq 'servertime') { + time; + } elsif($1 eq 'channelinfo') { + $self->{'channelinfo'}; + } else { + ''; + } + /gex; + + $self->write($header); +} + +sub sendMessages { + my $self=shift; + + my $numMessages=0; + my $msgTemplate=$self->getConf('MessageTemplate'); + my $msgData=''; + + foreach my $message (@_) + { + $msgData.=$message->messageWithTemplate($msgTemplate); + $numMessages++; + } + + return if($numMessages<1); + + $self->write($msgData); + + $::Statistics->{'messages_served'}+=$numMessages; + + my $msgCount=$self->{'MessageCount'}; + $msgCount+=$numMessages; + $self->{'MessageCount'}=$msgCount; + + my $maxMsg=$self->getConf('MaxMessages'); + if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg) + { + $self->close(1, 'maxMessageCountReached'); + } + + if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'}) + { + $self->close(1, 'maxMessageCountReached'); + } + +} + +sub ping { + my $self=shift; + my $msg=$self->getConf('PingMessage'); + + $self->write($msg); +} + +sub closeChannel { + my $self=shift; + my $channelName=shift; + + return unless(exists($self->{'channels'}->{$channelName})); + + my $channel=$self->{'channels'}->{$channelName}; + $channel->removeSubscriber($self,'channelClose'); + + delete($self->{'channels'}->{$channelName}); + + $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0); +} + +sub close { + my $self=shift; + my $noShutdownMsg=shift; + my $reason=shift; + + foreach my $channelName (keys %{$self->{'channels'}}) + { + my $channel=$self->{'channels'}->{$channelName}; + $channel->removeSubscriber($self,$reason); + } + delete($self->{'channels'}); + + # If this connection is in the PersistentConnections array, delete it, then anonymise + # it so that if we have to wait for the write buffer to empty before close, it's only + # removed once. + if(exists($self->{'subscriberID'})) { + delete($PersistentConnections{$self->{'subscriberID'}}); + delete($self->{'subscriberID'}); + } + + # Send shutdown message unless remote closed or + # connection not yet established + unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'})) + { + my $msg=$self->getConf('SubscriberShutdownMsg'); + if(defined($msg) && $msg ne '') + { + $self->write($msg); + } + } + + $self->SUPER::close(); +} + +sub didClose { + + $::Statistics->{'current_subscribers'}--; +} + +sub checkForMaxTime { + my $self=shift; + my $time=shift; + + $self->close(1,'maxTime') if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time); +} + +sub getConf { + my $self=shift; + my $key=shift; + + if(exists($self->{'mode'}) && $self->{'mode'} ne '') + { + my $k=$key.$self->{'mode'}; + + if(exists($::CONF{$k})) { + return $::CONF{$k}; + } + } + + $::CONF{$key}; +} + +1; +############################################################################EOF \ No newline at end of file