root / exilog_sql.pm @ 226ad0a3c764c0606048acf7371b02765eee60d2

View | Annotate | Download (14.1 KB)

1
#!/usr/bin/perl
2
#
3
# This file is part of the exilog suite.
4
#
5
# http://duncanthrax.net/exilog/
6
#
7
# (c) Tom Kistner 2004
8
#
9
# See LICENSE for licensing information.
10
#
11
12
package exilog_sql;
13
use strict;
14
use DBI;
15
use exilog_config;
16
use exilog_util;
17
18
use Data::Dumper;
19
20
BEGIN {
21
  use Exporter;
22
  use vars qw($VERSION @ISA @EXPORT @EXPORT_OK %EXPORT_TAGS);
23
24
  # set the version for version checking
25
  $VERSION     = 0.1;
26
  @ISA         = qw(Exporter);
27
  @EXPORT      = qw(
28
                      &reconnect
29
                      &sql_select
30
                      &sql_delete
31
                      &sql_optimize
32
                      &sql_count
33
                      &sql_queue_add
34
                      &sql_queue_update
35
                      &sql_queue_delete
36
                      &sql_queue_set_action
37
                      &sql_queue_clear_action
38
                      &write_message
39
                   );
40
41
  %EXPORT_TAGS = ();
42
43
  # your exported package globals go here,
44
  # as well as any optionally exported functions
45
  @EXPORT_OK   = qw();
46
}
47
48
49
# open DB connection
50
my $dbh = DBI->connect($config->{sql}->{DBI}, $config->{sql}->{user}, $config->{sql}->{pass});
51
unless (defined($dbh) && $dbh) {
52
  print STDERR "[exilog_sql] Can't open exilog database.\n";
53
  exit(255);
54
};
55
56
sub reconnect {
57
  my $conditional = shift || 0;
58
  if ($conditional) {
59
    return 1 if ($dbh->ping);
60
  };
61
  eval {
62
    $dbh->disconnect() if (defined($dbh));
63
  };
64
  $dbh = 0;
65
  $dbh = DBI->connect($config->{sql}->{DBI}, $config->{sql}->{user}, $config->{sql}->{pass});
66
  unless (defined($dbh) && $dbh) {
67
    print STDERR "[exilog_sql] Can't open exilog database.\n";
68
    return 0;
69
  };
70
  return 1;
71
};
72
73
74
# --------------------------------------------------------------------------
75
# Generic Stubs, these are just frontends that call the backend-specific
76
# SQL subroutines for each database type.
77
sub write_message {
78
  no strict "refs";
79
  return &{ "_".$config->{sql}->{type}."_write_message" }(@_);
80
};
81
82
sub sql_select {
83
  no strict "refs";
84
  return &{ "_".$config->{sql}->{type}."_sql_select" }(@_);
85
};
86
87
sub sql_delete {
88
  no strict "refs";
89
  return &{ "_".$config->{sql}->{type}."_sql_delete" }(@_);
90
};
91
92
sub sql_optimize {
93
  no strict "refs";
94
  return &{ "_".$config->{sql}->{type}."_sql_optimize" }(@_);
95
};
96
97
sub sql_queue_add {
98
  no strict "refs";
99
  return &{ "_".$config->{sql}->{type}."_sql_queue_add" }(@_);
100
};
101
102
sub sql_queue_update {
103
  no strict "refs";
104
  return &{ "_".$config->{sql}->{type}."_sql_queue_update" }(@_);
105
};
106
107
sub sql_queue_delete {
108
  no strict "refs";
109
  return &{ "_".$config->{sql}->{type}."_sql_queue_delete" }(@_);
110
};
111
112
sub sql_queue_set_action {
113
  no strict "refs";
114
  return &{ "_".$config->{sql}->{type}."_sql_queue_set_action" }(@_);
115
};
116
117
sub sql_queue_clear_action {
118
  no strict "refs";
119
  return &{ "_".$config->{sql}->{type}."_sql_queue_clear_action" }(@_);
120
};
121
122
sub sql_count {
123
  no strict "refs";
124
  return &{ "_".$config->{sql}->{type}."_sql_count" }(@_);
125
};
126
# --------------------------------------------------------------------------
127
128
129
# --------------------------------------------------------------------------
130
# PostgreSQL functions
131
sub _pgsql_sql_count {
132
  my $where = shift;
133
  my $criteria = shift || {};
134
135
  my $sql = "SELECT ".
136
            "COUNT(*) ".
137
            "FROM ".$where.
138
            ((scalar keys %{ $criteria } ) ? " "._build_WHERE($criteria) : "" );
139
140
  my $sh = $dbh->prepare($sql);
141
  $sh->execute;
142
  my $tmp = $sh->fetchrow_arrayref();
143
  return @{$tmp}[0];
144
};
145
146
sub _pgsql_sql_queue_delete {
147
  my $spool_path = shift;
148
149
  $dbh->do("DELETE FROM queue WHERE spool_path='$spool_path'");
150
};
151
152
sub _pgsql_sql_queue_update {
153
  my $hdr = shift;
154
155
  return unless (ref($hdr) eq 'HASH');
156
157
  my $server = $hdr->{server};
158
  my $message_id = $hdr->{message_id};
159
  delete $hdr->{server};
160
  delete $hdr->{message_id};
161
162
  # PostgreSQL is case sensitive by default. Nice feature,
163
  # but it complicates our life tremendously.
164
  # Since we want to keep indexes working, the columns in
165
  # this list are lowercased before they are inserted. Sigh.
166
  my @lowercase = ( 'mailfrom', 'recipients_delivered', 'recipients_pending' );
167
  foreach my $col (@lowercase) {
168
    $hdr->{$col} = lc($hdr->{$col}) if (edt($hdr,$col));
169
  };
170
171
  my @tmp;
172
  foreach my $item (keys %{ $hdr }) {
173
    my $value = $hdr->{$item};
174
    $value =~ s/\'/\'\'/g;
175
    $value =~ s/\n/\\n/g;
176
    push @tmp, $item.'='."'".$value."'";
177
  };
178
179
  $dbh->do("UPDATE queue SET ".join(",",@tmp)." WHERE message_id='".$message_id."' AND server='".$server."'");
180
};
181
182
sub _pgsql_sql_queue_add {
183
  my $hdr = shift;
184
185
  return unless (ref($hdr) eq 'HASH');
186
187
  # PostgreSQL is case sensitive by default. Nice feature,
188
  # but it complicates our life tremendously.
189
  # Since we want to keep indexes working, the columns in
190
  # this list are lowercased before they are inserted. Sigh.
191
  my @lowercase = ( 'mailfrom', 'recipients_delivered', 'recipients_pending' );
192
  foreach my $col (@lowercase) {
193
    $hdr->{$col} = lc($hdr->{$col}) if (edt($hdr,$col));
194
  };
195
196
  my @fields = sort {$a cmp $b} keys(%{$hdr});
197
  my @vals = ();
198
  foreach (@fields) {
199
    my $val = $hdr->{$_};
200
    $val =~ s/\'/\'\'/g;
201
    $val =~ s/\n/\\n/g;
202
    push @vals, "'".$val."'";
203
  };
204
205
  $dbh->do("INSERT INTO queue (".join(',',@fields).") VALUES(".join(',',@vals).")");
206
};
207
208
sub _pgsql_sql_optimize {
209
  my $where = shift || "nothing";
210
211
  my $sql = "OPTIMIZE TABLE ".$where;
212
  my $sh = $dbh->prepare($sql);
213
  $sh->execute;
214
  $sh->finish;
215
216
  return 1;
217
};
218
219
sub _pgsql_sql_delete {
220
  my $where = shift || "nothing";
221
  my $criteria = shift || {};
222
223
  my $sql = "DELETE FROM ".$where.
224
            ((scalar keys %{ $criteria } ) ? " "._build_WHERE($criteria) : "" );
225
226
  my $sh = $dbh->prepare($sql);
227
  my $num = $sh->execute;
228
  $sh->finish;
229
230
  return (($num eq '0E0') ? 0 : $num);
231
};
232
233
sub _pgsql_sql_select {
234
  my $where = shift;
235
  my @what = @{ (shift || [ "*" ]) };
236
  my $criteria = shift || {};
237
  my $order_by = shift || "";
238
  my $order_direction = shift || "DESC";
239
  my $limit_min = shift;
240
  my $limit_max = shift;
241
  my $distinct = shift;
242
243
  my $sql = "SELECT ".
244
            (defined($distinct) ? "DISTINCT " : "").
245
            join(", ", @what).
246
            " FROM ".$where.
247
            ((scalar keys %{ $criteria } ) ? " "._build_WHERE($criteria) : "" ).
248
            ($order_by ? " ORDER BY ".$order_by." ".$order_direction : "").
249
            (defined($limit_min) ? " LIMIT ".$limit_min : "").
250
            (defined($limit_max) ? ",".$limit_max : "");
251
252
  return _fetch_multirow($where, $sql);
253
};
254
255
sub _pgsql_write_message {
256
  my $server = shift || 'default';
257
  my $h = shift;
258
  my $rc = 0;
259
260
  # PostgreSQL is case sensitive by default. Nice feature,
261
  # but it complicates our life tremendously.
262
  # Since we want to keep indexes working, the columns in
263
  # this list are lowercased before they are inserted. Sigh.
264
  my @lowercase = ( 'mailfrom', 'rcpt', 'rcpt_final', 'host_dns', 'host_helo', 'host_rdns' );
265
  foreach my $col (@lowercase) {
266
    $h->{data}->{$col} = lc($h->{data}->{$col}) if (edt($h->{data},$col));
267
  };
268
269
  # Special case: we only need to UPDATE the 'completed' field
270
  # in the messages table.
271
  if ( ($h->{table} eq 'messages') && (exists($h->{data}->{completed})) ) {
272
    my $rc = $dbh->do("UPDATE messages SET completed='".$h->{data}->{completed}."' WHERE message_id='".$h->{data}->{message_id}."' AND server='".$server."'");
273
    if (defined($rc)) {
274
      return 1;
275
    }
276
    else {
277
      # error
278
      return 0;
279
    };
280
  }
281
  else {
282
    my @fields = sort {$a cmp $b} keys(%{$h->{data}});
283
    my @vals = ( "'".$server."'" );
284
    foreach (@fields) {            
285
      my $val = $h->{data}->{$_};
286
      $val =~ s/\'/\'\'/g;
287
      # shorten $val to limit and remove eventual
288
      # trailing quote and backslash characters.
289
      $val = substr($val,0,255);
290
      $val =~ s/[\\']+$//;
291
      push @vals, "'".$val."'";
292
    };
293
    unshift @fields, 'server';
294
295
    my $sql = "INSERT INTO ".$h->{table}.' ("'.join('","',@fields).'") VALUES('.join(',',@vals).")";
296
    my $rc = $dbh->do($sql);
297
298
    if (defined($rc)) {
299
      return 1;
300
    }
301
    else {
302
      return 2 if ($dbh->errstr =~ /duplicate/i);
303
      print STDERR "SQL Error (code ".$dbh->err.") on '$h->{table}' with query: $sql\n";
304
      return 0;
305
    };
306
  };
307
};
308
309
310
# --------------------------------------------------------------------------
311
# MySQL functions
312
sub _mysql_sql_count {
313
  my $where = shift;
314
  my $criteria = shift || {};
315
316
  my $sql = "SELECT ".
317
            "COUNT(*) ".
318
            "FROM ".$where.
319
            ((scalar keys %{ $criteria } ) ? " "._build_WHERE($criteria) : "" );
320
321
  my $sh = $dbh->prepare($sql);
322
  $sh->execute;
323
  my $tmp = $sh->fetchrow_arrayref();
324
  return @{$tmp}[0];
325
};
326
327
sub _mysql_sql_queue_delete {
328
  my $spool_path = shift;
329
330
  $dbh->do("DELETE FROM queue WHERE spool_path='$spool_path'");
331
};
332
333
sub _mysql_sql_queue_update {
334
  my $hdr = shift;
335
336
  return unless (ref($hdr) eq 'HASH');
337
338
  my $server = $hdr->{server};
339
  my $message_id = $hdr->{message_id};
340
  delete $hdr->{server};
341
  delete $hdr->{message_id};
342
343
  my @tmp;
344
  foreach my $item (keys %{ $hdr }) {
345
    my $value = $hdr->{$item};
346
    $value =~ s/\'/\'\'/g;
347
    $value =~ s/\n/\\n/g;
348
    push @tmp, $item.'='."'".$value."'";
349
  };
350
351
  $dbh->do("UPDATE queue SET ".join(",",@tmp)." WHERE message_id='".$message_id."' AND server='".$server."'");
352
};
353
354
sub _mysql_sql_queue_add {
355
  my $hdr = shift;
356
357
  return unless (ref($hdr) eq 'HASH');
358
359
  my @fields = sort {$a cmp $b} keys(%{$hdr});
360
  my @vals = ();
361
  foreach (@fields) {
362
    my $val = $hdr->{$_};
363
    $val =~ s/\'/\'\'/g;
364
    $val =~ s/\n/\\n/g;
365
    push @vals, "'".$val."'";
366
  };
367
368
  $dbh->do("INSERT INTO queue (".join(',',@fields).") VALUES(".join(',',@vals).")");
369
};
370
371
sub _mysql_sql_queue_set_action {
372
  my $server = shift;
373
  my $message_id = shift;
374
  my $action = shift;
375
376
  $dbh->do("UPDATE queue SET action='$action' WHERE server='$server' AND message_id='$message_id'");
377
};
378
379
sub _mysql_sql_queue_clear_action {
380
  my $server = shift;
381
  my $message_id = shift;
382
  
383
  $dbh->do("UPDATE queue SET action=NULL WHERE server='$server' AND message_id='$message_id'");
384
};
385
386
387
sub _mysql_sql_optimize {
388
  my $where = shift || "nothing";
389
390
  my $sql = "OPTIMIZE TABLE ".$where;
391
  my $sh = $dbh->prepare($sql);
392
  $sh->execute;
393
  $sh->finish;
394
395
  return 1;
396
};
397
398
sub _mysql_sql_delete {
399
  my $where = shift || "nothing";
400
  my $criteria = shift || {};
401
402
  my $sql = "DELETE FROM ".$where.
403
            ((scalar keys %{ $criteria } ) ? " "._build_WHERE($criteria) : "" );
404
405
  my $sh = $dbh->prepare($sql);
406
  my $num = $sh->execute;
407
  $sh->finish;
408
409
  return (($num eq '0E0') ? 0 : $num);
410
};
411
412
sub _mysql_sql_select {
413
  my $where = shift;
414
  my @what = @{ (shift || [ "*" ]) };
415
  my $criteria = shift || {};
416
  my $order_by = shift || "";
417
  my $order_direction = shift || "DESC";
418
  my $limit_min = shift;
419
  my $limit_max = shift;
420
  my $distinct = shift;
421
422
  my $sql = "SELECT ".
423
            (defined($distinct) ? "DISTINCT " : "").
424
            join(", ", @what).
425
            " FROM ".$where.
426
            ((scalar keys %{ $criteria } ) ? " "._build_WHERE($criteria) : "" ).
427
            ($order_by ? " ORDER BY ".$order_by." ".$order_direction : "").
428
            (defined($limit_min) ? " LIMIT ".$limit_min : "").
429
            (defined($limit_max) ? ",".$limit_max : "");
430
431
  return _fetch_multirow($where, $sql);
432
};
433
434
sub _mysql_write_message {
435
  my $server = shift || 'default';
436
  my $h = shift;
437
  my $rc = 0;
438
439
  # Special case: we only need to UPDATE the 'completed' field
440
  # in the messages table.
441
  if ( ($h->{table} eq 'messages') && (exists($h->{data}->{completed})) ) {
442
    my $rc = $dbh->do("UPDATE messages SET completed='".$h->{data}->{completed}."' WHERE message_id='".$h->{data}->{message_id}."' AND server='".$server."'");
443
    if (defined($rc)) {
444
      return 1;
445
    }
446
    else {
447
      # error
448
      return 0;
449
    };
450
  }
451
  else {
452
    my @fields = sort {$a cmp $b} keys(%{$h->{data}});
453
    my @vals = ( "'".$server."'" );
454
    foreach (@fields) {
455
      my $val = $h->{data}->{$_};
456
      $val =~ s/\'/\'\'/g;
457
      # shorten $val to limit and remove eventual
458
      # trailing quote and backslash characters.
459
      $val = substr($val,0,255);
460
      $val =~ s/[\\']+$//;
461
      push @vals, "'".$val."'";
462
    };
463
    unshift @fields, 'server';
464
465
    my $sql = "INSERT INTO ".$h->{table}." (".join(',',@fields).") VALUES(".join(',',@vals).")";
466
    my $rc = $dbh->do($sql);
467
468
    if (defined($rc)) {
469
      return 1;
470
    }
471
    else {
472
      # error 1062 means "Duplicate key".
473
      return 2 if ($dbh->err == 1062);
474
      print STDERR "SQL Error (code ".$dbh->err.") on '$h->{table}' with query: $sql\n";
475
      return 0;
476
    };
477
  };
478
};
479
480
481
# --------------------------------------------------------------------------
482
# misc subroutines used across several DB types
483
sub _fetch_multirow {
484
  my $table = shift;
485
  my $sql = shift;
486
  my $limit = shift || 0;
487
488
  my $a = [];
489
  my $sh = $dbh->prepare($sql);
490
  $sh->execute;
491
  while (my $tmp = $sh->fetchrow_hashref) {
492
    push @{ $a }, $tmp;
493
    $limit--;
494
    last if ($limit == 0);
495
  };
496
  $sh->finish;
497
498
  return $a;
499
};
500
501
sub _build_WHERE {
502
  my $criteria = shift || {};
503
504
  my @set = ();
505
  foreach my $col (keys %{ $criteria }) {
506
    next unless(defined($criteria->{$col}));
507
508
    if ( ($col eq "timestamp") ||
509
         ($col eq "completed") ||
510
         ($col eq "frozen") ||
511
         ($col eq "size") ) {
512
      # integer column
513
      my ($min,$max) = split / /,$criteria->{$col};
514
515
      if (defined($min)) {
516
        # greater than X
517
        push @set, $col." > ".$min;
518
      }
519
      if (defined($max)) {
520
        # smaller than X
521
        push @set, $col." < ".$max;
522
      }
523
    }
524
    elsif (ref($criteria->{$col}) eq 'ARRAY') {
525
      # array ref, use exact string match with OR
526
      my $str = "( ";
527
      foreach my $entry (@{ $criteria->{$col} }) {
528
        $str .= " ".$col." = '".$entry."' OR";
529
      };
530
      chop($str);chop($str);
531
      $str .= " )";
532
533
      push @set, $str;
534
    }
535
    else {
536
      # string column
537
      if (($criteria->{$col} =~ /\%/) || ($criteria->{$col} =~ /\_/)) {
538
        # use ILIKE for PGSQL
539
        if ($config->{sql}->{type} eq 'pgsql') {
540
          push @set, $col." ILIKE '".$criteria->{$col}."'";
541
        }
542
        else {
543
          push @set, $col." LIKE '".$criteria->{$col}."'";
544
        };
545
      }
546
      else {
547
        push @set, $col." = '".$criteria->{$col}."'";
548
      };
549
    };
550
  };
551
552
  return " WHERE ".join(" AND ", @set);
553
};
554
555
556
1;