--- /dev/null
+#!/usr/bin/perl -w
+###############################################################################
+# Meteor
+# An HTTP server for the 2.0 web
+# Copyright (c) 2006 contributing authors
+#
+# Subscriber.pm
+#
+# Description:
+# A Meteor Subscriber
+#
+###############################################################################
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the Free
+# Software Foundation; either version 2 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# For more information visit www.meteorserver.org
+#
+###############################################################################
+
+package Meteor::Subscriber;
+###############################################################################
+# Configuration
+###############################################################################
+
+ use strict;
+
+ use Meteor::Connection;
+ use Meteor::Channel;
+ use Meteor::Document;
+
+ @Meteor::Subscriber::ISA=qw(Meteor::Connection);
+
+ our %PersistentConnections=();
+ our $NumAcceptedConnections=0;
+
+
+###############################################################################
+# Factory methods
+###############################################################################
+sub newFromServer {
+ my $class=shift;
+
+ my $self=$class->SUPER::newFromServer(shift);
+
+ $self->{'headerBuffer'}='';
+ $self->{'MessageCount'}=0;
+ $self->{'MaxMessageCount'}=0;
+
+ $self->{'ConnectionStart'}=time;
+ my $maxTime=$::CONF{'MaxTime'};
+ if($maxTime>0)
+ {
+ $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
+ }
+
+ $::Statistics->{'current_subscribers'}++;
+ $::Statistics->{'subscriber_connections_accepted'}++;
+
+ $self;
+}
+
+###############################################################################
+# Class methods
+###############################################################################
+sub deleteSubscriberWithID {
+ my $class=shift;
+ my $id=shift;
+
+ if(exists($PersistentConnections{$id}))
+ {
+ $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
+ }
+}
+
+sub pingPersistentConnections {
+ my $class=shift;
+
+ my @cons=values %PersistentConnections;
+
+ map { $_->ping() } @cons;
+}
+
+sub checkPersistentConnectionsForMaxTime {
+ my $class=shift;
+
+ my $time=time;
+ my @cons=values %PersistentConnections;
+
+ map { $_->checkForMaxTime($time) } @cons;
+}
+
+sub numSubscribers {
+
+ return scalar(keys %PersistentConnections);
+}
+
+###############################################################################
+# Instance methods
+###############################################################################
+sub processLine {
+ my $self=shift;
+ my $line=shift;
+
+ # Once the header was processed we ignore any input
+ return unless(exists($self->{'headerBuffer'}));
+
+ if($line ne '')
+ {
+ #
+ # Accumulate header
+ #
+ $self->{'headerBuffer'}.="$line\n";
+ }
+ else
+ {
+ #
+ # Empty line signals end of header.
+ # Analyze header, register with appropiate channel
+ # and send pending messages.
+ #
+ # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
+ #
+ # Find the 'GET' line
+ #
+ if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/([a-z0-9_\-\%\.\/]+).*?/i)
+ {
+ $self->{'subscriberID'}=$1;
+ $self->{'mode'}=$2;
+ my $persist=$self->getConf('Persist');
+ my $maxTime=$self->getConf('MaxTime');
+ $self->{'ConnectionTimeLimit'} = ($self->{'ConnectionStart'}+$maxTime) if ($maxTime>0);
+
+ my @channelData=split('/',$3);
+ my $channels={};
+ my $channelName;
+ my $offset;
+ foreach my $chandef (@channelData) {
+ if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) {
+ $channelName = $1;
+ $channels->{$channelName}->{'startIndex'} = undef;
+ if ($3) {
+ $offset = $4;
+ if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
+ if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
+ if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
+ }
+ }
+ }
+ my $useragent = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
+
+ delete($self->{'headerBuffer'});
+
+ if ($persist) {
+ $self->deleteSubscriberWithID($self->{'subscriberID'});
+ $PersistentConnections{$self->{'subscriberID'}}=$self;
+ }
+
+ if(scalar(keys %{$channels})) {
+
+ $self->{'channelinfo'} = '';
+ my $citemplate = $self->getConf('ChannelInfoTemplate');
+ foreach $channelName (keys %{$channels}) {
+ my $channel=Meteor::Channel->channelWithName($channelName);
+ $self->{'channels'}->{$channelName}=$channel;
+ $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);
+
+ }
+ $self->emitOKHeader();
+ foreach $channelName (keys %{$channels}) {
+ my $startIndex=$channels->{$channelName}->{'startIndex'};
+ $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$useragent);
+ }
+ delete ($self->{'channels'}) unless($persist);
+ $self->close(1, 'responseComplete') unless($persist);
+ return;
+ }
+ }
+ elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
+ {
+ $self->deleteSubscriberWithID($1);
+ $self->emitOKHeader();
+ $self->close(1, 'disconnectRequested');
+ return;
+ }
+ elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
+ {
+ Meteor::Document->serveFileToClient($1,$self);
+ $self->close(1, 'responseComplete');
+ return;
+ }
+
+ #
+ # If we fall through we did not understand the request
+ #
+ $self->emitErrorHeader();
+ }
+}
+
+sub emitOKHeader {
+ my $self=shift;
+
+ $self->emitHeader('200 OK');
+}
+
+sub emitErrorHeader {
+ my $self=shift;
+
+ $self->emitHeader('404 Not Found');
+ $::Statistics->{'errors_served'}++;
+
+ # close up shop here!
+ $self->close(0, 'error');
+}
+
+sub emitHeader {
+ my $self=shift;
+ my $status=shift;
+
+ my $header=$self->getConf('HeaderTemplate');
+
+ $header=~s/~([^~]*)~/
+ if(!defined($1) || $1 eq '') {
+ '~';
+ } elsif($1 eq 'server') {
+ $::PGM;
+ } elsif($1 eq 'status') {
+ $status;
+ } elsif($1 eq 'servertime') {
+ time;
+ } elsif($1 eq 'channelinfo') {
+ $self->{'channelinfo'};
+ } else {
+ '';
+ }
+ /gex;
+
+ $self->write($header);
+}
+
+sub sendMessages {
+ my $self=shift;
+
+ my $numMessages=0;
+ my $msgTemplate=$self->getConf('MessageTemplate');
+ my $msgData='';
+
+ foreach my $message (@_)
+ {
+ $msgData.=$message->messageWithTemplate($msgTemplate);
+ $numMessages++;
+ }
+
+ return if($numMessages<1);
+
+ $self->write($msgData);
+
+ $::Statistics->{'messages_served'}+=$numMessages;
+
+ my $msgCount=$self->{'MessageCount'};
+ $msgCount+=$numMessages;
+ $self->{'MessageCount'}=$msgCount;
+
+ my $maxMsg=$self->getConf('MaxMessages');
+ if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
+ {
+ $self->close(1, 'maxMessageCountReached');
+ }
+
+ if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
+ {
+ $self->close(1, 'maxMessageCountReached');
+ }
+
+}
+
+sub ping {
+ my $self=shift;
+ my $msg=$self->getConf('PingMessage');
+
+ $self->write($msg);
+}
+
+sub closeChannel {
+ my $self=shift;
+ my $channelName=shift;
+
+ return unless(exists($self->{'channels'}->{$channelName}));
+
+ my $channel=$self->{'channels'}->{$channelName};
+ $channel->removeSubscriber($self,'channelClose');
+
+ delete($self->{'channels'}->{$channelName});
+
+ $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
+}
+
+sub close {
+ my $self=shift;
+ my $noShutdownMsg=shift;
+ my $reason=shift;
+
+ foreach my $channelName (keys %{$self->{'channels'}})
+ {
+ my $channel=$self->{'channels'}->{$channelName};
+ $channel->removeSubscriber($self,$reason);
+ }
+ delete($self->{'channels'});
+
+ # If this connection is in the PersistentConnections array, delete it, then anonymise
+ # it so that if we have to wait for the write buffer to empty before close, it's only
+ # removed once.
+ if(exists($self->{'subscriberID'})) {
+ delete($PersistentConnections{$self->{'subscriberID'}});
+ delete($self->{'subscriberID'});
+ }
+
+ # Send shutdown message unless remote closed or
+ # connection not yet established
+ unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
+ {
+ my $msg=$self->getConf('SubscriberShutdownMsg');
+ if(defined($msg) && $msg ne '')
+ {
+ $self->write($msg);
+ }
+ }
+
+ $self->SUPER::close();
+}
+
+sub didClose {
+
+ $::Statistics->{'current_subscribers'}--;
+}
+
+sub checkForMaxTime {
+ my $self=shift;
+ my $time=shift;
+
+ $self->close(1,'maxTime') if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
+}
+
+sub getConf {
+ my $self=shift;
+ my $key=shift;
+
+ if(exists($self->{'mode'}) && $self->{'mode'} ne '')
+ {
+ my $k=$key.$self->{'mode'};
+
+ if(exists($::CONF{$k})) {
+ return $::CONF{$k};
+ }
+ }
+
+ $::CONF{$key};
+}
+
+1;
+############################################################################EOF
\ No newline at end of file