Subversion Repositories ALCASAR

Rev

Details | Last modification | View Log

Rev Author Line No. Line
3241 rexy 1
<?php
2
 
3
namespace mbolli\nfsen_ng\common;
4
 
5
use mbolli\nfsen_ng\processor\Nfdump;
6
 
7
class Import {
8
    private readonly Debug $d;
9
    private readonly bool $cli;
10
    private bool $verbose = false;
11
    private bool $force = false;
12
    private bool $quiet = false;
13
    private bool $processPorts = false;
14
    private bool $processPortsBySource = false;
15
    private bool $checkLastUpdate = false;
16
 
17
    public function __construct() {
18
        $this->d = Debug::getInstance();
19
        $this->cli = (\PHP_SAPI === 'cli');
20
        $this->d->setDebug($this->verbose);
21
    }
22
 
23
    /**
24
     * @throws \Exception
25
     */
26
    public function start(\DateTime $dateStart): void {
27
        $sources = Config::$cfg['general']['sources'];
28
        $processedSources = 0;
29
 
30
        // if in force mode, reset existing data
31
        if ($this->force === true) {
32
            if ($this->cli === true) {
33
                echo 'Resetting existing data...' . \PHP_EOL;
34
            }
35
            Config::$db->reset([]);
36
        }
37
 
38
        // start progress bar (CLI only)
39
        $daysTotal = ((int) $dateStart->diff(new \DateTime())->format('%a') + 1) * \count($sources);
40
        if ($this->cli === true && $this->quiet === false) {
41
            echo \PHP_EOL . \mbolli\nfsen_ng\vendor\ProgressBar::start($daysTotal, 'Processing ' . \count($sources) . ' sources...');
42
        }
43
 
44
        // process each source, e.g. gateway, mailserver, etc.
45
        foreach ($sources as $nr => $source) {
46
            $sourcePath = Config::$cfg['nfdump']['profiles-data'] . \DIRECTORY_SEPARATOR . Config::$cfg['nfdump']['profile'];
47
            if (!file_exists($sourcePath)) {
48
                throw new \Exception('Could not read nfdump profile directory ' . $sourcePath);
49
            }
50
            if ($this->cli === true && $this->quiet === false) {
51
                echo \PHP_EOL . 'Processing source ' . $source . ' (' . ($nr + 1) . '/' . \count($sources) . ')...' . \PHP_EOL;
52
            }
53
 
54
            $date = clone $dateStart;
55
 
56
            // check if we want to continue a stopped import
57
            // assumes the last update of a source is similar to the last update of its ports...
58
            $lastUpdateDb = Config::$db->last_update($source);
59
 
60
            $lastUpdate = null;
61
            if ($lastUpdateDb !== false && $lastUpdateDb !== 0) {
62
                $lastUpdate = (new \DateTime())->setTimestamp($lastUpdateDb);
63
            }
64
 
65
            if ($this->force === false && isset($lastUpdate)) {
66
                $daysSaved = (int) $date->diff($lastUpdate)->format('%a');
67
                $daysTotal -= $daysSaved;
68
                if ($this->quiet === false) {
69
                    $this->d->log('Last update: ' . $lastUpdate->format('Y-m-d H:i'), \LOG_INFO);
70
                }
71
                if ($this->cli === true && $this->quiet === false) {
72
                    \mbolli\nfsen_ng\vendor\ProgressBar::setTotal($daysTotal);
73
                }
74
 
75
                // set progress to the date when the import was stopped
76
                $date->setTimestamp($lastUpdateDb);
77
                $date->setTimezone(new \DateTimeZone(date_default_timezone_get()));
78
            }
79
 
80
            // iterate from $datestart until today
81
            while ((int) $date->format('Ymd') <= (int) (new \DateTime())->format('Ymd')) {
82
                $scan = [$sourcePath, $source, $date->format('Y'), $date->format('m'), $date->format('d')];
83
                $scanPath = implode(\DIRECTORY_SEPARATOR, $scan);
84
 
85
                // set date to tomorrow for next iteration
86
                $date->modify('+1 day');
87
 
88
                // if no data exists for current date  (e.g. .../2017/03/03)
89
                if (!file_exists($scanPath)) {
90
                    $this->d->dpr($scanPath . ' does not exist!');
91
                    if ($this->cli === true && $this->quiet === false) {
92
                        echo \mbolli\nfsen_ng\vendor\ProgressBar::next(1);
93
                    }
94
                    continue;
95
                }
96
 
97
                // scan path
98
                $this->d->log('Scanning path ' . $scanPath, \LOG_INFO);
99
                $scanFiles = scandir($scanPath);
100
 
101
                if ($this->cli === true && $this->quiet === false) {
102
                    echo \mbolli\nfsen_ng\vendor\ProgressBar::next(1, 'Scanning ' . $scanPath . '...');
103
                }
104
 
105
                foreach ($scanFiles as $file) {
106
                    if (\in_array($file, ['.', '..'], true)) {
107
                        continue;
108
                    }
109
 
110
                    try {
111
                        // parse date of file name to compare against last_update
112
                        preg_match('/nfcapd\.([0-9]{12})$/', (string) $file, $fileDate);
113
                        if (\count($fileDate) !== 2) {
114
                            throw new \LengthException('Bad file name format of nfcapd file: ' . $file);
115
                        }
116
                        $fileDatetime = new \DateTime($fileDate[1]);
117
                    } catch (\LengthException $e) {
118
                        $this->d->log('Caught exception: ' . $e->getMessage(), \LOG_DEBUG);
119
                        continue;
120
                    }
121
 
122
                    // compare file name date with last update
123
                    if ($fileDatetime <= $lastUpdate) {
124
                        continue;
125
                    }
126
 
127
                    // let nfdump parse each nfcapd file
128
                    $statsPath = implode(\DIRECTORY_SEPARATOR, \array_slice($scan, 2, 5)) . \DIRECTORY_SEPARATOR . $file;
129
 
130
                    try {
131
                        // fill source.rrd
132
                        $this->writeSourceData($source, $statsPath);
133
 
134
                        // write general port data (queries data for all sources, should only be executed when data for all sources exists...)
135
                        if ($this->processPorts === true && $nr === \count($sources) - 1) {
136
                            $this->writePortsData($statsPath);
137
                        }
138
 
139
                        // if enabled, process ports per source as well (source_80.rrd)
140
                        if ($this->processPortsBySource === true) {
141
                            $this->writePortsData($statsPath, $source);
142
                        }
143
                    } catch (\Exception $e) {
144
                        $this->d->log('Caught exception: ' . $e->getMessage(), \LOG_WARNING);
145
                    }
146
                }
147
            }
148
            ++$processedSources;
149
        }
150
        if ($processedSources === 0) {
151
            $this->d->log('Import did not process any sources.', \LOG_WARNING);
152
        }
153
        if ($this->cli === true && $this->quiet === false) {
154
            echo \mbolli\nfsen_ng\vendor\ProgressBar::finish();
155
        }
156
    }
157
 
158
    /**
159
     * @throws \Exception
160
     */
161
    private function writeSourceData(string $source, string $statsPath): bool {
162
        // set options and get netflow summary statistics (-I)
163
        $nfdump = Nfdump::getInstance();
164
        $nfdump->reset();
165
        $nfdump->setOption('-I', null);
166
        $nfdump->setOption('-M', $source);
167
        $nfdump->setOption('-r', $statsPath);
168
 
169
        if ($this->dbUpdatable($statsPath, $source) === false) {
170
            return false;
171
        }
172
 
173
        try {
174
            $input = $nfdump->execute();
175
        } catch (\Exception $e) {
176
            $this->d->log('Exception: ' . $e->getMessage(), \LOG_WARNING);
177
 
178
            return false;
179
        }
180
 
181
        $date = new \DateTime(mb_substr($statsPath, -12));
182
        $data = [
183
            'fields' => [],
184
            'source' => $source,
185
            'port' => 0,
186
            'date_iso' => $date->format('Ymd\THis'),
187
            'date_timestamp' => $date->getTimestamp(),
188
        ];
189
        // $input data is an array of lines looking like this:
190
        // flows_tcp: 323829
191
        foreach ($input as $i => $line) {
192
            if (!\is_string($line)) {
193
                $this->d->log('Got no output of previous command', \LOG_DEBUG);
194
            }
195
            if ($i === 0) {
196
                continue;
197
            } // skip nfdump command
198
            if (!preg_match('/:/', (string) $line)) {
199
                continue;
200
            } // skip invalid lines like error messages
201
            [$type, $value] = explode(': ', (string) $line);
202
 
203
            // we only need flows/packets/bytes values, the source and the timestamp
204
            if (preg_match('/^(flows|packets|bytes)/i', $type)) {
205
                $data['fields'][mb_strtolower($type)] = (int) $value;
206
            }
207
        }
208
 
209
        // write to database
210
        if (Config::$db->write($data) === false) {
211
            throw new \Exception('Error writing to ' . $statsPath);
212
        }
213
 
214
        return true;
215
    }
216
 
217
    /**
218
     * @throws \Exception
219
     */
220
    private function writePortsData(string $statsPath, string $source = ''): bool {
221
        $ports = Config::$cfg['general']['ports'];
222
 
223
        foreach ($ports as $port) {
224
            $this->writePortData($port, $statsPath, $source);
225
        }
226
 
227
        return true;
228
    }
229
 
230
    /**
231
     * @throws \Exception
232
     */
233
    private function writePortData(int $port, string $statsPath, string $source = ''): bool {
234
        $sources = Config::$cfg['general']['sources'];
235
 
236
        // set options and get netflow statistics
237
        $nfdump = Nfdump::getInstance();
238
        $nfdump->reset();
239
 
240
        if (empty($source)) {
241
            // if no source is specified, get data for all sources
242
            $nfdump->setOption('-M', implode(':', $sources));
243
            if ($this->dbUpdatable($statsPath, '', $port) === false) {
244
                return false;
245
            }
246
        } else {
247
            $nfdump->setOption('-M', $source);
248
            if ($this->dbUpdatable($statsPath, $source, $port) === false) {
249
                return false;
250
            }
251
        }
252
 
253
        $nfdump->setFilter('dst port ' . $port);
254
        $nfdump->setOption('-s', 'dstport:p');
255
        $nfdump->setOption('-r', $statsPath);
256
 
257
        try {
258
            $input = $nfdump->execute();
259
        } catch (\Exception $e) {
260
            $this->d->log('Exception: ' . $e->getMessage(), \LOG_WARNING);
261
 
262
            return false;
263
        }
264
 
265
        // parse and turn into usable data
266
 
267
        $date = new \DateTime(mb_substr($statsPath, -12));
268
        $data = [
269
            'fields' => [
270
                'flows' => 0,
271
                'packets' => 0,
272
                'bytes' => 0,
273
            ],
274
            'source' => $source,
275
            'port' => $port,
276
            'date_iso' => $date->format('Ymd\THis'),
277
            'date_timestamp' => $date->getTimestamp(),
278
        ];
279
 
280
        // process protocols
281
        // headers: ts,te,td,pr,val,fl,flP,ipkt,ipktP,ibyt,ibytP,ipps,ipbs,ibpp
282
        foreach ($input as $i => $line) {
283
            if (!\is_array($line) && $line instanceof \Countable === false) {
284
                continue;
285
            } // skip anything uncountable
286
            if (\count($line) !== 14) {
287
                continue;
288
            } // skip anything invalid
289
            if ($line[0] === 'ts') {
290
                continue;
291
            } // skip header
292
 
293
            $proto = mb_strtolower((string) $line[3]);
294
 
295
            // add protocol-specific
296
            $data['fields']['flows_' . $proto] = (int) $line[5];
297
            $data['fields']['packets_' . $proto] = (int) $line[7];
298
            $data['fields']['bytes_' . $proto] = (int) $line[9];
299
 
300
            // add to overall stats
301
            $data['fields']['flows'] += (int) $line[5];
302
            $data['fields']['packets'] += (int) $line[7];
303
            $data['fields']['bytes'] += (int) $line[9];
304
        }
305
 
306
        // write to database
307
        if (Config::$db->write($data) === false) {
308
            throw new \Exception('Error writing to ' . $statsPath);
309
        }
310
 
311
        return true;
312
    }
313
 
314
    /**
315
     * Import a single nfcapd file.
316
     */
317
    public function importFile(string $file, string $source, bool $last): void {
318
        try {
319
            $this->d->log('Importing file ' . $file . ' (' . $source . '), last=' . (int) $last, \LOG_INFO);
320
 
321
            // fill source.rrd
322
            $this->writeSourceData($source, $file);
323
 
324
            // write general port data (not depending on source, so only executed per port)
325
            if ($last === true) {
326
                $this->writePortsData($file);
327
            }
328
 
329
            // if enabled, process ports per source as well (source_80.rrd)
330
            if ($this->processPorts === true) {
331
                $this->writePortsData($file, $source);
332
            }
333
        } catch (\Exception $e) {
334
            $this->d->log('Caught exception: ' . $e->getMessage(), \LOG_WARNING);
335
        }
336
    }
337
 
338
    /**
339
     * Check if db is free to update (some databases only allow inserting data at the end).
340
     *
341
     * @throws \Exception
342
     */
343
    public function dbUpdatable(string $file, string $source = '', int $port = 0): bool {
344
        if ($this->checkLastUpdate === false) {
345
            return true;
346
        }
347
 
348
        // parse capture file's datetime. can't use filemtime as we need the datetime in the file name.
349
        $date = [];
350
        if (!preg_match('/nfcapd\.([0-9]{12})$/', $file, $date)) {
351
            return false;
352
        } // nothing to import
353
 
354
        $fileDatetime = new \DateTime($date[1]);
355
 
356
        // get last updated time from database
357
        $lastUpdateDb = Config::$db->last_update($source, $port);
358
        $lastUpdate = null;
359
        if ($lastUpdateDb !== 0) {
360
            $lastUpdate = new \DateTime();
361
            $lastUpdate->setTimestamp($lastUpdateDb);
362
        }
363
 
364
        // prevent attempt to import the same file again
365
        return $fileDatetime > $lastUpdate;
366
    }
367
 
368
    public function setVerbose(bool $verbose): void {
369
        if ($verbose === true) {
370
            $this->d->setDebug(true);
371
        }
372
        $this->verbose = $verbose;
373
    }
374
 
375
    public function setProcessPorts(bool $processPorts): void {
376
        $this->processPorts = $processPorts;
377
    }
378
 
379
    public function setForce(bool $force): void {
380
        $this->force = $force;
381
    }
382
 
383
    public function setQuiet(bool $quiet): void {
384
        $this->quiet = $quiet;
385
    }
386
 
387
    public function setProcessPortsBySource($processPortsBySource): void {
388
        $this->processPortsBySource = $processPortsBySource;
389
    }
390
 
391
    public function setCheckLastUpdate(bool $checkLastUpdate): void {
392
        $this->checkLastUpdate = $checkLastUpdate;
393
    }
394
}