Subversion Repositories ALCASAR

Rev

Blame | Last modification | View Log

<?php

namespace mbolli\nfsen_ng\datasources;

use mbolli\nfsen_ng\common\Config;
use mbolli\nfsen_ng\common\Debug;

class Akumuli implements Datasource {
    private readonly Debug $d;
    private $client;

    public function __construct() {
        $this->d = Debug::getInstance();
        $this->connect();
    }

    /**
     * connects to TCP socket.
     */
    public function connect(): void {
        try {
            $this->client = stream_socket_client('tcp://' . Config::$cfg['db']['akumuli']['host'] . ':' . Config::$cfg['db']['akumuli']['port'], $errno, $errmsg);

            if ($this->client === false) {
                throw new \Exception('Failed to connect to Akumuli: ' . $errmsg);
            }
        } catch (\Exception $e) {
            $this->d->dpr($e);
        }
    }

    /**
     * Convert data to redis-compatible string and write to Akumuli.
     */
    public function write(array $data): bool {
        $fields = array_keys($data['fields']);
        $values = array_values($data['fields']);

        // writes assume redis protocol. first byte identification:
        // "+" simple strings  "-" errors  ":" integers  "$" bulk strings  "*" array
        $query = '+' . implode('|', $fields) . ' source=' . $data['source'] . "\r\n"
            . '+' . $data['date_iso'] . "\r\n" // timestamp
            . '*' . \count($fields) . "\r\n"; // length of following array

        // add the $values corresponding to $fields
        foreach ($values as $v) {
            $query .= ':' . $v . "\r\n";
        }

        $this->d->dpr([$query]);

        // write redis-compatible string to socket
        fwrite($this->client, $query);

        return stream_get_contents($this->client);

        // to read:
        // curl localhost:8181/api/query -d "{'select':'flows'}"
    }

    public function __destruct() {
        if (\is_resource($this->client)) {
            fclose($this->client);
        }
    }

    /**
     * Gets data for plotting the graph in the frontend.
     * Each row in $return['data'] will be one line in the graph.
     * The lines can be
     *   * protocols - $sources must not contain more than one source (legend e.g. gateway_flows_udp, gateway_flows_tcp)
     *   * sources - $protocols must not contain more than one protocol (legend e.g. gateway_traffic_icmp, othersource_traffic_icmp).
     *
     * @param int    $start     timestamp
     * @param int    $end       timestamp
     * @param array  $sources   subset of sources specified in settings
     * @param array  $protocols UDP/TCP/ICMP/other
     * @param string $type      flows/packets/traffic
     *
     * @return array in the following format:
     *
     * $return = array(
     *  'start' => 1490484600,      // timestamp of first value
     *  'end' => 1490652000,        // timestamp of last value
     *  'step' => 300,              // resolution of the returned data in seconds. lowest value would probably be 300 = 5 minutes
     *  'data' => array(
     *      0 => array(
     *          'legend' => 'source_type_protocol',
     *          'data' => array(
     *          1490484600 => 33.998333333333,
     *          1490485200 => 37.005, ...
     *          )
     *      ),
     *      1 => array( e.g. gateway_flows_udp ...)
     *  )
     * );
     */
    public function get_graph_data(int $start, int $end, array $sources, array $protocols, array $ports, string $type = 'flows', string $display = 'sources'): array|string {
        // TODO: Implement get_graph_data() method.
        return [];
    }

    /**
     * Gets the timestamps of the first and last entry in the datasource (for this specific source).
     *
     * @return array (timestampfirst, timestamplast)
     */
    public function date_boundaries(string $source): array {
        // TODO: Implement date_boundaries() method.
        return [];
    }

    /**
     * Gets the timestamp of the last update of the datasource (for this specific source).
     */
    public function last_update(string $source, int $port = 0): int {
        // TODO: Implement last_update() method.
        return 0;
    }

    /**
     * Gets the path where the datasource's data is stored.
     */
    public function get_data_path(): string {
        // TODO: Implement get_data_path() method.
        return '';
    }

    /**
     * Removes all existing data for every source in $sources.
     * If $sources is empty, remove all existing data.
     */
    public function reset(array $sources): bool {
        // TODO: Implement reset() method.
        return true;
    }
}