]> Dogcows Code - chaz/chatty/blobdiff - extra/cometd/Meteor/Subscriber.pm
import meteord-1.06
[chaz/chatty] / extra / cometd / Meteor / Subscriber.pm
diff --git a/extra/cometd/Meteor/Subscriber.pm b/extra/cometd/Meteor/Subscriber.pm
new file mode 100644 (file)
index 0000000..cb6269b
--- /dev/null
@@ -0,0 +1,372 @@
+#!/usr/bin/perl -w
+###############################################################################
+#   Meteor
+#   An HTTP server for the 2.0 web
+#   Copyright (c) 2006 contributing authors
+#
+#   Subscriber.pm
+#
+#      Description:
+#      A Meteor Subscriber
+#
+###############################################################################
+#
+#   This program is free software; you can redistribute it and/or modify it
+#   under the terms of the GNU General Public License as published by the Free
+#   Software Foundation; either version 2 of the License, or (at your option)
+#   any later version.
+#
+#   This program is distributed in the hope that it will be useful, but WITHOUT
+#   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+#   FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+#   more details.
+#
+#   You should have received a copy of the GNU General Public License along
+#   with this program; if not, write to the Free Software Foundation, Inc.,
+#   59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+#   For more information visit www.meteorserver.org
+#
+###############################################################################
+
+package Meteor::Subscriber;
+###############################################################################
+# Configuration
+###############################################################################
+       
+       use strict;
+       
+       use Meteor::Connection;
+       use Meteor::Channel;
+       use Meteor::Document;
+       
+       @Meteor::Subscriber::ISA=qw(Meteor::Connection);
+       
+       our %PersistentConnections=();
+       our $NumAcceptedConnections=0;
+    
+
+###############################################################################
+# Factory methods
+###############################################################################
+sub newFromServer {
+       my $class=shift;
+       
+       my $self=$class->SUPER::newFromServer(shift);
+       
+       $self->{'headerBuffer'}='';
+       $self->{'MessageCount'}=0;
+       $self->{'MaxMessageCount'}=0;
+       
+       $self->{'ConnectionStart'}=time;
+       my $maxTime=$::CONF{'MaxTime'};
+       if($maxTime>0)
+       {
+               $self->{'ConnectionTimeLimit'}=$self->{'ConnectionStart'}+$maxTime;
+       }
+       
+       $::Statistics->{'current_subscribers'}++;
+       $::Statistics->{'subscriber_connections_accepted'}++;
+       
+       $self;
+}
+
+###############################################################################
+# Class methods
+###############################################################################
+sub deleteSubscriberWithID {
+       my $class=shift;
+       my $id=shift;
+       
+       if(exists($PersistentConnections{$id}))
+       {
+               $PersistentConnections{$id}->close(0,'newSubscriberWithSameID');
+       }
+}
+
+sub pingPersistentConnections {
+       my $class=shift;
+       
+       my @cons=values %PersistentConnections;
+       
+       map { $_->ping() } @cons;
+}
+
+sub checkPersistentConnectionsForMaxTime {
+       my $class=shift;
+       
+       my $time=time;
+       my @cons=values %PersistentConnections;
+       
+       map { $_->checkForMaxTime($time) } @cons;
+}
+
+sub numSubscribers {
+       
+       return scalar(keys %PersistentConnections);
+}
+
+###############################################################################
+# Instance methods
+###############################################################################
+sub processLine {
+       my $self=shift;
+       my $line=shift;
+       
+       # Once the header was processed we ignore any input
+       return unless(exists($self->{'headerBuffer'}));
+       
+       if($line ne '')
+       {
+               #
+               # Accumulate header
+               #
+               $self->{'headerBuffer'}.="$line\n";
+       }
+       else
+       {
+               #
+               # Empty line signals end of header.
+               # Analyze header, register with appropiate channel
+               # and send pending messages.
+               #
+               # GET $::CONF{'SubscriberDynamicPageAddress'}/hostid/streamtype/channeldefs HTTP/1.1
+               #
+               # Find the 'GET' line
+               #
+               if($self->{'headerBuffer'}=~/GET\s+$::CONF{'SubscriberDynamicPageAddress'}\/([0-9a-z]+)\/([0-9a-z]+)\/([a-z0-9_\-\%\.\/]+).*?/i)
+               {
+                       $self->{'subscriberID'}=$1;
+                       $self->{'mode'}=$2;
+                       my $persist=$self->getConf('Persist');
+                       my $maxTime=$self->getConf('MaxTime');
+                       $self->{'ConnectionTimeLimit'} = ($self->{'ConnectionStart'}+$maxTime) if ($maxTime>0);
+                       
+                       my @channelData=split('/',$3);
+                       my $channels={};
+                       my $channelName;
+                       my $offset;
+                       foreach my $chandef (@channelData) {
+                               if($chandef=~/^([a-z0-9_\-\%]+)(.(r|b|h)([0-9]*))?$/i) {
+                                       $channelName = $1;
+                                       $channels->{$channelName}->{'startIndex'} = undef;
+                                       if ($3) {
+                                          $offset = $4;
+                                          if ($3 eq 'r') { $channels->{$channelName}->{'startIndex'} = $offset; }
+                                          if ($3 eq 'b') { $channels->{$channelName}->{'startIndex'} = -$offset; }
+                                          if ($3 eq 'h') { $channels->{$channelName}->{'startIndex'} = 0; }
+                                       }
+                               }
+                       }
+                       my $useragent = ($self->{'headerBuffer'}=~/User-Agent: (.+)/i) ? $1 : "-";
+                       
+                       delete($self->{'headerBuffer'});
+                       
+                       if ($persist) {
+                               $self->deleteSubscriberWithID($self->{'subscriberID'});
+                               $PersistentConnections{$self->{'subscriberID'}}=$self;
+                       }
+                       
+                       if(scalar(keys %{$channels})) {
+
+                               $self->{'channelinfo'} = '';
+                               my $citemplate = $self->getConf('ChannelInfoTemplate');
+                               foreach $channelName (keys %{$channels}) {
+                                       my $channel=Meteor::Channel->channelWithName($channelName);
+                                       $self->{'channels'}->{$channelName}=$channel;
+                                       $self->{'channelinfo'} .= $channel->descriptionWithTemplate($citemplate);
+                                       
+                               }
+                               $self->emitOKHeader();
+                               foreach $channelName (keys %{$channels}) {
+                                       my $startIndex=$channels->{$channelName}->{'startIndex'};
+                                       $self->{'channels'}->{$channelName}->addSubscriber($self,$startIndex,$persist,$self->{'mode'},$useragent);
+                               }
+                               delete ($self->{'channels'}) unless($persist);
+                               $self->close(1, 'responseComplete') unless($persist);
+                               return;
+                       }
+               }
+               elsif($self->{'headerBuffer'}=~/GET\s+\/disconnect\/(\S+)/)
+               {
+                       $self->deleteSubscriberWithID($1);
+                       $self->emitOKHeader();
+                       $self->close(1, 'disconnectRequested');
+                       return;
+               }
+               elsif($self->{'headerBuffer'}=~/GET\s+([^\s\?]+)/)
+               {
+                       Meteor::Document->serveFileToClient($1,$self);
+                       $self->close(1, 'responseComplete');
+                       return;
+               }
+               
+               #
+               # If we fall through we did not understand the request
+               #
+               $self->emitErrorHeader();
+       }
+}
+
+sub emitOKHeader {
+       my $self=shift;
+       
+       $self->emitHeader('200 OK');
+}
+
+sub emitErrorHeader {
+       my $self=shift;
+       
+       $self->emitHeader('404 Not Found');
+       $::Statistics->{'errors_served'}++;
+       
+       # close up shop here!
+       $self->close(0, 'error');
+}
+
+sub emitHeader {
+       my $self=shift;
+       my $status=shift;
+       
+       my $header=$self->getConf('HeaderTemplate');
+       
+       $header=~s/~([^~]*)~/
+               if(!defined($1) || $1 eq '') {
+                       '~';
+               } elsif($1 eq 'server') {
+                       $::PGM;
+               } elsif($1 eq 'status') {
+                       $status;
+               } elsif($1 eq 'servertime') {
+                       time;
+               } elsif($1 eq 'channelinfo') {
+                       $self->{'channelinfo'};
+               } else {
+                       '';
+               }
+       /gex;
+       
+       $self->write($header);
+}
+
+sub sendMessages {
+       my $self=shift;
+       
+       my $numMessages=0;
+       my $msgTemplate=$self->getConf('MessageTemplate');
+       my $msgData='';
+       
+       foreach my $message (@_)
+       {
+               $msgData.=$message->messageWithTemplate($msgTemplate);
+               $numMessages++;
+       }
+       
+       return if($numMessages<1);
+       
+       $self->write($msgData);
+       
+       $::Statistics->{'messages_served'}+=$numMessages;
+       
+       my $msgCount=$self->{'MessageCount'};
+       $msgCount+=$numMessages;
+       $self->{'MessageCount'}=$msgCount;
+       
+       my $maxMsg=$self->getConf('MaxMessages');
+       if(defined($maxMsg) && $maxMsg>0 && $msgCount>=$maxMsg)
+       {
+               $self->close(1, 'maxMessageCountReached');
+       }
+       
+       if($self->{'MaxMessageCount'}>0 && $msgCount>=$self->{'MaxMessageCount'})
+       {
+               $self->close(1, 'maxMessageCountReached');
+       }
+       
+}
+
+sub ping {
+       my $self=shift;
+       my $msg=$self->getConf('PingMessage');
+       
+       $self->write($msg);
+}
+
+sub closeChannel {
+       my $self=shift;
+       my $channelName=shift;
+       
+       return unless(exists($self->{'channels'}->{$channelName}));
+       
+       my $channel=$self->{'channels'}->{$channelName};
+       $channel->removeSubscriber($self,'channelClose');
+       
+       delete($self->{'channels'}->{$channelName});
+       
+       $self->close(0,'channelClose') if(scalar(keys %{$self->{'channels'}})==0);
+}
+
+sub close {
+       my $self=shift;
+       my $noShutdownMsg=shift;
+       my $reason=shift;
+       
+       foreach my $channelName (keys %{$self->{'channels'}})
+       {
+               my $channel=$self->{'channels'}->{$channelName};
+               $channel->removeSubscriber($self,$reason);
+       }
+       delete($self->{'channels'});
+       
+       # If this connection is in the PersistentConnections array, delete it, then anonymise 
+       # it so that if we have to wait for the write buffer to empty before close, it's only
+       # removed once.
+       if(exists($self->{'subscriberID'})) {
+               delete($PersistentConnections{$self->{'subscriberID'}});
+               delete($self->{'subscriberID'});
+       }
+       
+       # Send shutdown message unless remote closed or
+       # connection not yet established
+       unless($noShutdownMsg || $self->{'remoteClosed'} || exists($self->{'headerBuffer'}))
+       {
+               my $msg=$self->getConf('SubscriberShutdownMsg');
+               if(defined($msg) && $msg ne '')
+               {
+                       $self->write($msg);
+               }
+       }
+       
+       $self->SUPER::close();
+}
+
+sub didClose {
+       
+       $::Statistics->{'current_subscribers'}--;
+}
+
+sub checkForMaxTime {
+       my $self=shift;
+       my $time=shift;
+       
+       $self->close(1,'maxTime') if(exists($self->{'ConnectionTimeLimit'}) && $self->{'ConnectionTimeLimit'}<$time);
+}
+
+sub getConf {
+       my $self=shift;
+       my $key=shift;
+       
+       if(exists($self->{'mode'}) && $self->{'mode'} ne '')
+       {
+               my $k=$key.$self->{'mode'};
+               
+               if(exists($::CONF{$k})) {
+                       return $::CONF{$k};
+               }
+       }
+       
+       $::CONF{$key};
+}
+
+1;
+############################################################################EOF
\ No newline at end of file
This page took 0.023068 seconds and 4 git commands to generate.