Private GIT

Skip to content
Snippets Groups Projects
Commit 5c140dd2 authored by Maximilien Bersoult's avatar Maximilien Bersoult
Browse files

* First commit for CentreonD

parent 45ebf7f9
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/perl
use strict;
use Getopt::Long;
use Config::IniFiles;
use Sys::Hostname;
use DBI;
use Module::Util qw(:all);
use centreon::common::logger;
use centreon::daemon::crypt;
use centreon::daemon::processus;
use centreon::daemon::events qw(emit);
my $configFile = '/etc/centreon/centreon.ini';
my $keygen = 0;
GetOptions(
"config=s" => \$configFile,
"keygen" => \$keygen
) or die("Error in command line arguments.");
if (!-f $configFile) {
print("The configuration file does not exists.\n");
exit 2;
}
my $config = Config::IniFiles->new(
-file => $configFile,
-nomultiline => 1
);
# Generate keyfile if ask
if ($keygen) {
centreon::daemon::crypt::generateKeys("/tmp", "central", "test");
exit 1;
}
# List of server type
#
# central : Run server in Central with database
# poller : Run server to a poller
my $serverType = "central";
if ($config->exists("centreond", "type")) {
$serverType = $config->val("centreond", "type");
}
my $pollername = hostname();
# Initialize log file
our $logger = centreon::common::logger->new();
if ($config->exists("centreond", "log_lvl")) {
$logger->severity($config->val("centreond", "log_lvl"));
}
if ($config->exists("centreond", "log_file")) {
$logger->file_mode($config->val("centreond", "log_file"));
}
# Load module
my @modules;
if ($config->exists("centreond", "module")) {
@modules = $config->val("centreond", "module");
}
$logger->writeLogInfo("Initialize daemon");
foreach my $module (@modules) {
if (find_installed($module)) {
$logger->writeLogInfo("Load module : $module");
eval "use ${module}";
my $registerFunc = "${module}::register_events";
${module}->register_events($serverType);
}
}
if ($serverType eq "central") {
$logger->writeLogInfo("Initialize for Central");
my @childrenPid;
# Open connection to database
# if (!$config->exists("db_centreon", "dsn")) {
# print("The database is not configured.\n");
# exit 2;
# }
# my $dbh = DBI->connect(
# $config->val("db_centreon", "dsn"),
# $config->val("db_centreon", "username"),
# $config->val("db_centreon", "password")
# );
my @pollers;
# Pollers for test
my %poller1 = (
"name" => "TEST1",
"url" => "tcp://127.0.0.1:5557"
);
push(@pollers, \%poller1);
# Fork for router
my $routerPid = fork();
if ($routerPid) {
push(@childrenPid, $routerPid);
} elsif ($routerPid == 0) {
centreon::daemon::processus::runRouter();
} else {
$logger->writeLogError("Error when fork process.");
$logger->writeLogDebug($!);
exit 2;
}
sleep(1);
# Fork for action listener
my $actionListenPid = fork();
if ($actionListenPid) {
push(@childrenPid, $actionListenPid);
} elsif ($actionListenPid == 0) {
centreon::daemon::processus::runActionListener(5555);
} else {
$logger->writeLogError("Error when fork process.");
$logger->writeLogDebug($!);
exit 2;
}
# Fork for poller listener
my $pollerListenPid = fork();
if ($pollerListenPid) {
push(@childrenPid, $actionListenPid);
} elsif ($pollerListenPid == 0) {
centreon::daemon::processus::runPollerListener(5556);
} else {
$logger->writeLogError("Error when fork process.");
$logger->writeLogDebug($!);
exit 2;
}
# Connect to poller
foreach my $poller (@pollers) {
sleep(1);
my $pollerConnectPid = fork();
if ($pollerConnectPid) {
push(@childrenPid, $pollerConnectPid);
} elsif ($pollerConnectPid == 0) {
centreon::daemon::processus::runConnectToPoller(
$poller->{"name"},
$poller->{"url"}
);
}
}
while (wait() != -1) {}
} else {
centreon::daemon::processus::runPoller(5557);
}
[centreond]
type=poller
module=Centreond::Modules::CentreonEngine
[centreond]
type=central
module=Centreond::Modules::CentreonEngine
......@@ -69,9 +69,13 @@ my %severities = (1 => LOG_INFO,
2 => LOG_ERR,
4 => LOG_DEBUG);
my $singleton = undef;
sub new {
my $class = shift;
return $singleton if defined $singleton;
my $self = bless
{
file => 0,
......@@ -85,7 +89,8 @@ sub new {
log_facility => undef,
log_option => LOG_PID,
}, $class;
return $self;
$singleton = $self;
return $singleton;
}
sub file_mode($$) {
......
package centreon::daemon::crypt;
use strict;
use warnings;
use Crypt::OpenSSL::Random;
use Crypt::OpenSSL::RSA;
sub generateKeys($$$) {
my $path = shift;
my $name = shift;
my $passwd = shift;
Crypt::OpenSSL::Random::random_seed($passwd);
Crypt::OpenSSL::RSA->import_random_seed();
my $rsa = Crypt::OpenSSL::RSA->generate_key(1024);
print $rsa->get_public_key_x509_string();
}
sub getSymetricKey() {
}
1;
package centreon::daemon::events;
use strict;
use warnings;
use Exporter;
our @ISA = qw(Exporter);
our @EXPORT = qw(&on &emit);
our %events;
sub on {
my ($name, $callback) = @_;
push(@{$events{$name}}, $callback);
}
sub emit {
my $name = shift;
my @args = @_;
foreach (@{$events{$name}}) {
$_->(@args);
}
}
1;
package centreon::daemon::processus;
use strict;
use warnings;
use ZMQ::LibZMQ3;
use ZMQ::Constants qw(:all);
use centreon::common::logger;
use centreon::daemon::zmq;
use constant MAX_SIZE_BUF => 256;
my $logger;
BEGIN {
$logger = centreon::common::logger->new();
}
sub runRouter() {
my @connectedToDealer;
my $context = zmq_init();
my $socket = zmq_socket($context, ZMQ_ROUTER);
zmq_setsockopt($socket, ZMQ_IDENTITY, "ROUTER");
if (zmq_bind($socket, "ipc://router.ipc") != 0) {
$logger->writeLogError("ROUTER::Error to initialize socket");
$logger->writeLogDebug("ROUTER::" . $!);
return;
}
$logger->writeLogInfo("ROUTER::Initialized");
while (1) {
my $message = centreon::daemon::zmq::recv_router($socket);
print $message->{"message"} . "\n";
## Test redirect poller TEST1
zmq_send($socket, "TEST1", -1, ZMQ_SNDMORE);
zmq_send($socket, "", -1, ZMQ_SNDMORE);
zmq_send($socket, "test", -1);
}
}
sub runActionListener($) {
my $port = shift;
my $context = zmq_init();
my $toRouter = zmq_socket($context, ZMQ_ROUTER);
zmq_setsockopt($toRouter, ZMQ_IDENTITY, "ACTION_LISTENER");
my $ret = zmq_connect($toRouter, "ipc://router.ipc");
if ($ret != 0) {
$logger->writeLogError("ACTION_LISTENER::Error to initialize socket");
$logger->writeLogDebug("ACTION_LISTENER::" . $!);
return;
}
my $listen = zmq_socket($context, ZMQ_REP);
zmq_bind($listen, "tcp://*:$port");
$logger->writeLogInfo("ACTION_LISTENER::Initialized");
while (1) {
my $buf;
my $sizeBuf = zmq_recv($listen, $buf, MAX_SIZE_BUF);
if ($sizeBuf > 0) {
zmq_send($listen, "ASYNC");
zmq_send($toRouter, "ROUTER", -1, ZMQ_SNDMORE);
zmq_send($toRouter, "", -1, ZMQ_SNDMORE);
zmq_send($toRouter, $buf, -1);
}
}
}
sub runPollerListener($) {
my $port = shift;
my $context = zmq_init();
my $toRouter = zmq_socket($context, ZMQ_ROUTER);
zmq_setsockopt($toRouter, ZMQ_IDENTITY, "POLLER_LISTENER");
my $ret = zmq_connect($toRouter, "ipc://router.ipc");
if ($ret != 0) {
$logger->writeLogError("POLLER_LISTENER::Error to initialize socket");
$logger->writeLogDebug("POLLER_LISTENER::" . $!);
return;
}
my $listen = zmq_socket($context, ZMQ_REP);
zmq_bind($listen, "tcp://*:$port");
my @poll = (
{
socket => $toRouter,
events => ZMQ_POLLIN,
callback => sub {
my $message = centreon::daemon::zmq::recv_router($toRouter);
}
},
{
socket => $listen,
events => ZMQ_POLLIN,
callback => sub {
my $message = centreon::daemon::zmq::recv_router($listen);
zmq_send($listen, "ASYNC", -1);
zmq_send($toRouter, "ROUTER", -1, ZMQ_SNDMORE);
zmq_send($toRouter, "", -1, ZMQ_SNDMORE);
zmq_send($toRouter, $message->{"source"} . "\n", -1, ZMQ_SNDMORE);
zmq_send($toRouter, "\n", -1, ZMQ_SNDMORE);
zmq_send($toRouter, $message->{"message"}, -1);
}
}
);
$logger->writeLogInfo("POLLER_LISTENER::Initialized");
while (1) {
zmq_poll(\@poll);
}
}
sub runConnectToPoller($$) {
my $name = shift;
my $url = shift;
my $context = zmq_init();
my $toRouter = zmq_socket($context, ZMQ_ROUTER);
zmq_setsockopt($toRouter, ZMQ_IDENTITY, $name);
my $ret = zmq_connect($toRouter, "ipc://router.ipc");
if ($ret != 0) {
$logger->writeLogError("${name}::Error to initialize socket");
$logger->writeLogDebug("${name}::" . $!);
return;
}
my $socket = zmq_socket($context, ZMQ_REQ);
zmq_setsockopt($socket, ZMQ_RECONNECT_IVL, 5000);
zmq_connect($socket, $url);
my @poll = (
{
socket => $toRouter,
events => ZMQ_POLLIN,
callback => sub {
my $message = centreon::daemon::zmq::recv_router($toRouter);
zmq_send($socket, $message->{"message"}, -1);
}
},
{
socket => $socket,
events => ZMQ_POLLIN,
callback => sub {
my $message = centreon::daemon::zmq::recv_router($socket);
#zmq_send($toRouter, "ROUTER", -1, ZMQ_SNDMORE);
#zmq_send($toRouter, "", -1, ZMQ_SNDMORE);
#zmq_send($toRouter, $message->{"source"} . "\n", -1, ZMQ_SNDMORE);
#zmq_send($toRouter, "\n", -1, ZMQ_SNDMORE);
#zmq_send($toRouter, $message->{"message"}, -1);
}
}
);
$logger->writeLogInfo("${name}::Initialized");
while (1) {
zmq_poll(\@poll);
}
}
sub runPoller($) {
# Open socket for listen
my $context = zmq_init();
my $listen = zmq_socket($context, ZMQ_REP);
zmq_bind($listen, "tcp://*:5557");
while (1) {
#my $message = centreon::daemon::zmq::recv_router($listen);
my $buf;
my $sizeBuf = zmq_recv($listen, $buf, MAX_SIZE_BUF);
if ($sizeBuf > 0) {
zmq_send($listen, "ASYNC");
print "$buf\n";
}
}
}
1;
package centreon::daemon::zmq;
use strict;
use warnings;
use ZMQ::LibZMQ3;
use ZMQ::Constants qw(:all);
sub recv_router($) {
my $socket = shift;
my %message;
my $source = undef;
my $inMessage = 0;
my $str = "";
while (1) {
my $stream = zmq_recvmsg($socket);
last if (!defined $stream);
my $size = zmq_msg_size($stream);
my $data = zmq_msg_data($stream);
if (!defined $source) {
$source = $data;
} elsif ($inMessage != 0) {
$inMessage = 1;
} else {
$str .= $data;
}
last if (not zmq_getsockopt($socket, ZMQ_RCVMORE));
}
$message{"source"} = $source;
$message{"message"} = $str;
return \%message;
}
1;
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment