--- /dev/null
+#!/usr/bin/perl -w
+###############################################################################
+# Meteor
+# An HTTP server for the 2.0 web
+# Copyright (c) 2006 contributing authors
+#
+# Subscriber.pm
+#
+# Description:
+# A Meteor Channel
+#
+###############################################################################
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the Free
+# Software Foundation; either version 2 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# For more information visit www.meteorserver.org
+#
+###############################################################################
+
+package Meteor::Channel;
+###############################################################################
+# Configuration
+###############################################################################
+
+ use strict;
+
+ use Meteor::Message;
+
+ our %Channels=();
+ our $MessageID=0;
+
+###############################################################################
+# Class methods
+###############################################################################
+sub channelWithName {
+ my $class=shift;
+ my $channelName=shift;
+ my $avoidCreation=shift;
+
+ unless(exists($Channels{$channelName}))
+ {
+ return undef if($avoidCreation);
+ #
+ # Create new channel
+ #
+ $Channels{$channelName}=$class->newChannel($channelName);
+
+ &::syslog('debug',"New channel $channelName");
+ }
+
+ return $Channels{$channelName};
+}
+
+sub listChannels {
+ my $class=shift;
+
+ my $list='';
+ foreach my $channelName (sort keys %Channels)
+ {
+ my $channel=$Channels{$channelName};
+
+ $list.=$channelName.'('.$channel->messageCount().'/'.$channel->subscriberCount().")$::CRLF";
+ }
+
+ $list;
+}
+
+sub deleteChannel {
+ my $class=shift;
+ my $channelName=shift;
+
+ delete($Channels{$channelName});
+}
+
+sub trimMessageStoresByTimestamp {
+ my $class=shift;
+ my $minTimeStamp=shift;
+
+ return unless($minTimeStamp);
+
+ map { $_->trimMessageStoreByTimestamp($minTimeStamp) } (values %Channels);
+}
+
+sub clearAllBuffers {
+ my $class=shift;
+
+ map { $_->clearBuffer() } (values %Channels);
+}
+
+sub numChannels {
+
+ return scalar(keys %Channels);
+}
+
+###############################################################################
+# Factory methods
+###############################################################################
+sub new {
+ #
+ # Create a new empty instance
+ #
+ my $class=shift;
+
+ my $obj={};
+
+ bless($obj,$class);
+}
+
+sub newChannel {
+ #
+ # new instance from new server connection
+ #
+ my $self=shift->new();
+
+ my $name=shift;
+ $self->{'name'}=$name;
+
+ $self->{'subscribers'}=[];
+ $self->{'messages'}=[];
+
+ $self;
+}
+
+sub DESTROY {
+ my $self=shift;
+
+ my @subscribers=@{$self->{'subscribers'}};
+ map { $_->closeChannel($self->{'name'}) } @subscribers;
+}
+
+###############################################################################
+# Instance methods
+###############################################################################
+sub name {
+ shift->{'name'};
+}
+
+sub addSubscriber {
+ my $self=shift;
+ my $subscriber=shift;
+ my $startId=shift;
+ my $persist=shift;
+ my $mode=shift || '';
+ my $userAgent=shift || '';
+
+ # Note: negative $startId means go back that many messages
+ my $startIndex=$self->indexForMessageID($startId);
+ my $logStartIndex = $startIndex || $self->lastMsgID() || 0;
+
+ push(@{$self->{'subscribers'}},$subscriber) if($persist);
+
+ &::syslog('info','',
+ 'joinchannel',
+ $subscriber->{'ip'},
+ $subscriber->{'subscriberID'},
+ $self->{'name'},
+ $mode,
+ $logStartIndex,
+ $userAgent
+ );
+
+ return unless(defined($startIndex));
+
+ my $msgCount=scalar(@{$self->{'messages'}});
+ my $txt='';
+
+ $startIndex=0 if($startIndex<0);
+
+ if($startIndex<$msgCount) {
+ $subscriber->sendMessages(@{$self->{'messages'}}[$startIndex..$msgCount-1]);
+ }
+}
+
+sub removeSubscriber {
+ my $self=shift;
+ my $subscriber=shift;
+ my $reason=shift ||'unknown';
+
+ my $idx=undef;
+ my $numsubs = scalar(@{$self->{'subscribers'}});
+
+ for (my $i=0; $i<$numsubs; $i++) {
+ if($self->{'subscribers'}->[$i]==$subscriber) {
+ $idx=$i;
+ last;
+ }
+ }
+
+ if(defined($idx))
+ {
+ splice(@{$self->{'subscribers'}},$idx,1);
+
+ my $timeConnected = time - $subscriber->{'ConnectionStart'};
+ &::syslog('info','',
+ 'leavechannel',
+ $subscriber->{'ip'},
+ $subscriber->{'subscriberID'},
+ $self->{'name'},
+ $timeConnected,
+ $subscriber->{'MessageCount'},
+ $subscriber->{'bytesWritten'},
+ $reason
+ );
+ }
+
+ $self->checkExpiration();
+}
+
+sub subscriberCount {
+ my $self=shift;
+
+ scalar(@{$self->{'subscribers'}});
+}
+
+sub addMessage {
+ my $self=shift;
+ my $messageText=shift;
+
+ my $message=Meteor::Message->newWithID($MessageID++);
+ $message->setText($messageText);
+ $message->setChannelName($self->{'name'});
+ push(@{$self->{'messages'}},$message);
+ &::syslog('debug',"New message ".$message->{"id"}." on channel ".$self->{'name'});
+
+ $self->trimMessageStoreBySize();
+
+ map { $_->sendMessages($message) } @{$self->{'subscribers'}};
+
+ $message;
+}
+
+sub messageCount {
+ my $self=shift;
+
+ scalar(@{$self->{'messages'}});
+}
+
+sub trimMessageStoreBySize {
+ my $self=shift;
+
+ my $numMessages=scalar(@{$self->{'messages'}});
+
+ if($numMessages>$::CONF{'MaxMessagesPerChannel'})
+ {
+ splice(@{$self->{'messages'}},0,-$::CONF{'MaxMessagesPerChannel'});
+ }
+}
+
+sub trimMessageStoreByTimestamp {
+ my $self=shift;
+ my $ts=shift;
+
+ while(scalar(@{$self->{'messages'}})>0 && $self->{'messages'}->[0]->timestamp()<$ts)
+ {
+ my $msg=shift(@{$self->{'messages'}});
+ }
+
+ $self->checkExpiration();
+}
+
+sub clearBuffer {
+ my $self=shift;
+
+ $self->{'messages'}=[];
+
+ $self->checkExpiration();
+}
+
+sub checkExpiration {
+ my $self=shift;
+
+ if($self->messageCount()==0 && $self->subscriberCount()==0)
+ {
+ my $name=$self->name();
+ &::syslog('debug',"Channel expired: $name");
+ $self->deleteChannel($name);
+ }
+}
+
+sub indexForMessageID {
+ my $self=shift;
+ my $id=shift;
+
+ # the messages is always sorted by ID, so we can
+ # use a binary search to find the message.
+ # return undef if there are no messages or the
+ # ID is that of the last message.
+ # Otherwise return the ID of the found message
+ # of if no message with that ID exists the one
+ # with the next higher ID
+ #
+ return undef unless(defined($id));
+
+ my $numMessages=scalar(@{$self->{'messages'}});
+
+ return undef unless($numMessages);
+ return -1 unless($id ne '');
+
+ # Note: negative $id means go back that many messages
+ return $numMessages+$id if($id<0);
+
+ my $low=0;
+ my $high=$numMessages-1;
+ my $mid;
+ my $cond;
+ while($low<=$high)
+ {
+ $mid=($low+$high)>>1;
+ $cond=$id <=> $self->{'messages'}->[$mid]->id();
+ if($cond<0)
+ {
+ $high=$mid-1;
+ }
+ elsif($cond>0)
+ {
+ $low=$mid+1;
+ }
+ else
+ {
+ return $mid;
+ }
+ }
+
+ return undef if($low>=$numMessages);
+
+ return $low;
+}
+
+sub lastMsgID {
+ my $self=shift;
+ my $numMessages=scalar(@{$self->{'messages'}});
+ return undef unless($numMessages>0);
+ @{$self->{'messages'}}[-1]->id();
+}
+
+sub descriptionWithTemplate {
+ my $self=shift;
+ my $template=shift;
+
+ return '' unless(defined($template) && $template ne '');
+
+ $template=~s/~([a-zA-Z0-9_]*)~/
+ if(!defined($1) || $1 eq '') {
+ '~';
+ } elsif($1 eq 'messageCount') {
+ $self->messageCount();
+ } elsif($1 eq 'subscriberCount') {
+ $self->subscriberCount();
+ } elsif($1 eq 'lastMsgID') {
+ $self->lastMsgID() || 0;
+ } elsif($1 eq 'name') {
+ $self->{'name'};
+ } else {
+ '';
+ }
+ /gex;
+
+ $template;
+}
+
+1;
+############################################################################EOF
\ No newline at end of file