Skip to content

Commit 960712b

Browse files
Alexey PortnovAlexey Portnov
authored andcommitted
fix(cdr): recover from missing CDR tables and enable WAL (#1000, #1019)
WorkerCallEvents/WorkerCdr crash-loop with PDOException "no such table: cdr" (Sentry #27651, 2600+ events on 3 servers) when the CDR database file is recreated empty after corruption or a storage remount. Adds runtime recovery via DatabaseProviderBase::ensureCdrTables() called at both worker startups, serialised through MutexProvider so the parallel WorkerSafeScripts Fiber restarts cannot race on createUpdateDbTableByAnnotations(). Wraps the known crash points (InsertDataToDB::execute, UpdateDataInDB::execute, ActionHangupChan::execute, CallDetailRecordsTmp::afterSave move-to-cdr_general, and WorkerCallEvents::deleteOldRecords) in try/catch so a missing table no longer takes the whole worker down. deleteOldRecords also parameterises its DELETE and short-circuits via tableExists() to avoid spamming Sentry every minute when recovery failed. Closes #1019: PRAGMA journal_mode=WAL, busy_timeout=5000, synchronous=NORMAL, cache_size and temp_store now apply to CDRDatabaseProvider and RecordingStorageDatabaseProvider in addition to MainDatabaseProvider. With WAL, REST API readers (WorkerApiCommands x3, AGI CdrDb.php, voicemail-sender) no longer block the WorkerCallEvents writer. WAL is gated on isWalSafeFilesystem() which reads /proc/mounts and falls back to rollback journal on NFS/CIFS/SSHFS/davfs/glusterfs/ceph/lustre/gpfs (WAL shared-memory mmap is unsafe on network filesystems). DeleteRecordAction now dispatches deletion through a new WorkerCdr::DELETE_CDR_TUBE Beanstalk queue handled by DeleteCDR inside WorkerCallEvents, restoring the single-writer contract on cdr.db and preventing parallel writes between WorkerApiCommands and the call event worker. The action keeps its input validation in the REST context, checks BeanstalkClient::isConnected() (the constructor swallows connect errors), uses a 60s worker timeout for cascade deletes on slow USB storage, and forwards the request as a single sendRequest. DeleteCDR carries the previously-private deletion helpers, JSON-encodes the audit context to prevent log injection, uses JSON_THROW_ON_ERROR | JSON_INVALID_UTF8_SUBSTITUTE for replies, and refuses to unlink recording files outside Directories::AST_MONITOR_DIR via realpath + base prefix check. Verification on a live mikopbx container: - DROP TABLE cdr; cdr_general → SIGUSR1 → WorkerSafeScripts restart rebuilds both tables via annotations and only one of the two parallel workers logs "CDR tables missing" (mutex confirmed). - cdr.db PRAGMA journal_mode = wal after recovery; -wal/-shm files present. - DELETE /cdr/<id> single, DELETE /cdr/mikopbx-* cascade (3 records), 404 on missing id, 400 on invalid format, 400 on empty id all return correct httpCode and JSON shape. - No PDOException, "no such table", or "Refused to delete" entries in syslog after the cycle.
1 parent e26e6b9 commit 960712b

9 files changed

Lines changed: 647 additions & 282 deletions

File tree

src/Common/Models/CallDetailRecordsTmp.php

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,17 +89,23 @@ public function afterSave(): void
8989
// If work is completed (value of 'work_completed' is '1') and should be moved to general CDR,
9090
// create a new CallDetailRecords instance and copy relevant attributes.
9191
if ($work_completed === '1' && $moveToGeneral) {
92-
$newCdr = new CallDetailRecords();
93-
$vars = $this->toArray();
94-
foreach ($vars as $key => $value) {
95-
if ('id' === $key) {
96-
continue;
97-
}
98-
if (property_exists($newCdr, $key)) {
99-
$newCdr->writeAttribute($key, $value);
92+
try {
93+
$newCdr = new CallDetailRecords();
94+
$vars = $this->toArray();
95+
foreach ($vars as $key => $value) {
96+
if ('id' === $key) {
97+
continue;
98+
}
99+
if (property_exists($newCdr, $key)) {
100+
$newCdr->writeAttribute($key, $value);
101+
}
100102
}
103+
$newCdr->save();
104+
} catch (Throwable $e) {
105+
// Prevent crash loop when cdr_general table is missing (issue #1000).
106+
// The temporary record remains in `cdr` and will be retried by WorkerCdr.
107+
CriticalErrorsHandler::handleExceptionWithSyslog($e);
101108
}
102-
$newCdr->save();
103109
}
104110
$this->saveCdrCache();
105111
}

src/Common/Providers/DatabaseProviderBase.php

Lines changed: 165 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,18 @@
2222
namespace MikoPBX\Common\Providers;
2323

2424

25+
use MikoPBX\Common\Models\CallDetailRecords;
26+
use MikoPBX\Common\Models\CallDetailRecordsTmp;
27+
use MikoPBX\Core\System\SystemMessages;
2528
use MikoPBX\Core\System\System;
29+
use MikoPBX\Core\System\Upgrade\UpdateDatabase;
2630
use MikoPBX\Core\System\Util;
2731
use Phalcon\Di\Di;
2832
use Phalcon\Di\DiInterface;
2933
use Phalcon\Events\Manager as EventsManager;
3034
use Phalcon\Logger\Adapter\Stream as FileLogger;
3135
use Phalcon\Logger\Logger;
36+
use Throwable;
3237

3338
/**
3439
* Main database connection is created based in the parameters defined in the configuration file
@@ -99,21 +104,38 @@ function () use ($dbConfig, $serviceName) {
99104
*/
100105
$connection->setNestedTransactionsWithSavepoints(false);
101106

102-
if (!System::isBooting() && $serviceName === MainDatabaseProvider::SERVICE_NAME) {
107+
$optimizedServices = [
108+
MainDatabaseProvider::SERVICE_NAME,
109+
CDRDatabaseProvider::SERVICE_NAME,
110+
RecordingStorageDatabaseProvider::SERVICE_NAME,
111+
];
112+
if (!System::isBooting() && in_array($serviceName, $optimizedServices, true)) {
103113
// Optimize SQLite for better concurrency
104114
// Set busy timeout to 5 seconds - wait for lock instead of immediate failure
105115
$connection->execute("PRAGMA busy_timeout = 5000");
106-
107-
// Keep WAL mode for better concurrency (already set, but ensure it)
108-
$connection->execute("PRAGMA journal_mode = WAL");
109-
116+
117+
// WAL: readers don't block writers and vice versa, only one writer at a time.
118+
// Required for CDR database where WorkerCallEvents (writer) runs in parallel
119+
// with WorkerApiCommands (REST API readers) and Asterisk AGI CdrDb.php.
120+
// Skip on network filesystems (NFS/SMB/SSHFS) where WAL shared-memory is
121+
// unsafe and can corrupt the database — fall back to rollback journal.
122+
if (self::isWalSafeFilesystem($dbConfig['dbfile'])) {
123+
$connection->execute("PRAGMA journal_mode = WAL");
124+
} else {
125+
SystemMessages::sysLogMsg(
126+
self::class,
127+
"WAL disabled for {$serviceName}: filesystem under {$dbConfig['dbfile']} is not WAL-safe",
128+
LOG_NOTICE
129+
);
130+
}
131+
110132
// Use NORMAL synchronous mode for better performance while maintaining durability
111133
// FULL is very safe but slower, NORMAL is good balance
112134
$connection->execute("PRAGMA synchronous = NORMAL");
113-
135+
114136
// // Increase cache size to 10MB for better performance
115137
$connection->execute("PRAGMA cache_size = -10000");
116-
138+
117139
// Use memory for temp tables
118140
$connection->execute("PRAGMA temp_store = MEMORY");
119141
}
@@ -168,6 +190,142 @@ function ($event, $connection) use ($logger) {
168190
$connection->setEventsManager($eventsManager);
169191
}
170192

193+
/**
194+
* Ensures CDR tables (`cdr` and `cdr_general`) exist in the CDR database.
195+
*
196+
* Called at worker startup to recover from corruption or storage remount
197+
* scenarios that cause the CDR database file to be recreated empty.
198+
* Without this, WorkerCallEvents and WorkerCdr crash-loop with
199+
* "no such table: cdr" (Sentry #27651, GitHub issue #1000).
200+
*
201+
* IMPORTANT: this method MUST only be called from a worker startup path,
202+
* before any Phalcon model instance has been created in the current process.
203+
* `recreateDBConnections()` re-registers DI services and invalidates any
204+
* model instances that already cached a connection. Calling this from a
205+
* mid-request context will silently break previously-loaded models.
206+
*
207+
* The static guard prevents accidental re-entry in the same process.
208+
* The mutex serialises parallel `WorkerCallEvents`/`WorkerCdr` startup
209+
* across processes (`WorkerSafeScriptsCore` restarts both via Fibers).
210+
*
211+
* All errors are caught and logged — recovery failures must not prevent
212+
* the worker from starting, otherwise we trade a crash loop for a silent
213+
* refusal to start.
214+
*/
215+
public static function ensureCdrTables(): void
216+
{
217+
static $alreadyRun = false;
218+
if ($alreadyRun) {
219+
return;
220+
}
221+
$alreadyRun = true;
222+
223+
try {
224+
$di = Di::getDefault();
225+
if ($di === null) {
226+
return;
227+
}
228+
229+
$recovery = static function () use ($di): void {
230+
/** @var \Phalcon\Db\Adapter\Pdo\Sqlite $connection */
231+
$connection = $di->get(CDRDatabaseProvider::SERVICE_NAME);
232+
if ($connection->tableExists('cdr') && $connection->tableExists('cdr_general')) {
233+
return;
234+
}
235+
236+
SystemMessages::sysLogMsg(
237+
self::class,
238+
'CDR tables missing, recreating from model annotations',
239+
LOG_WARNING
240+
);
241+
242+
$dbUpdater = new UpdateDatabase();
243+
$dbUpdater->createUpdateDbTableByAnnotations(CallDetailRecordsTmp::class);
244+
$dbUpdater->createUpdateDbTableByAnnotations(CallDetailRecords::class);
245+
246+
self::recreateDBConnections();
247+
};
248+
249+
try {
250+
/** @var \MikoPBX\Common\Providers\MutexProvider|object $mutex */
251+
$mutex = $di->get(MutexProvider::SERVICE_NAME);
252+
$mutex->synchronized('cdr-tables-recovery', $recovery, 30, 60);
253+
} catch (Throwable $mutexError) {
254+
// Mutex provider unavailable (e.g. Redis down at boot) — fall
255+
// back to direct execution. SQLite's busy_timeout still
256+
// serialises parallel CREATE TABLE attempts.
257+
SystemMessages::sysLogMsg(
258+
self::class,
259+
'CDR recovery mutex unavailable, falling back: ' . $mutexError->getMessage(),
260+
LOG_NOTICE
261+
);
262+
$recovery();
263+
}
264+
} catch (Throwable $e) {
265+
SystemMessages::sysLogMsg(
266+
self::class,
267+
'CDR tables recovery failed: ' . $e->getMessage(),
268+
LOG_ERR
269+
);
270+
}
271+
}
272+
273+
/**
274+
* Determines whether the filesystem hosting the given DB file supports
275+
* SQLite WAL safely. WAL relies on shared-memory mmap and atomic fsync
276+
* which break on network filesystems (NFS/SMB/SSHFS) and may silently
277+
* corrupt the database.
278+
*
279+
* Detection is best-effort: we read `/proc/mounts`, find the longest
280+
* matching mount point for the given path, and reject known-unsafe
281+
* filesystem types. On any error or unknown FS we default to ALLOWING
282+
* WAL — that matches the previous behavior for MainDatabaseProvider
283+
* and avoids surprising regressions on exotic local filesystems.
284+
*/
285+
private static function isWalSafeFilesystem(string $dbFile): bool
286+
{
287+
$unsafeFsTypes = [
288+
'nfs', 'nfs4', 'cifs', 'smb', 'smbfs', 'smb2', 'smb3',
289+
'sshfs', 'davfs', 'glusterfs', 'ceph', 'lustre', 'gpfs',
290+
];
291+
292+
try {
293+
$mountsContent = @file_get_contents('/proc/mounts');
294+
if ($mountsContent === false || $mountsContent === '') {
295+
return true;
296+
}
297+
298+
$bestMatch = '';
299+
$bestType = '';
300+
foreach (explode("\n", $mountsContent) as $line) {
301+
$parts = preg_split('/\s+/', trim($line));
302+
if (!is_array($parts) || count($parts) < 3) {
303+
continue;
304+
}
305+
$mountPoint = $parts[1];
306+
$fsType = $parts[2];
307+
if ($mountPoint === '') {
308+
continue;
309+
}
310+
$needle = ($mountPoint === '/') ? '/' : $mountPoint . '/';
311+
if (str_starts_with($dbFile . '/', $needle)
312+
&& strlen($mountPoint) > strlen($bestMatch)
313+
) {
314+
$bestMatch = $mountPoint;
315+
$bestType = $fsType;
316+
}
317+
}
318+
319+
if ($bestType === '') {
320+
return true;
321+
}
322+
323+
return !in_array(strtolower($bestType), $unsafeFsTypes, true);
324+
} catch (Throwable) {
325+
return true;
326+
}
327+
}
328+
171329
/**
172330
* Recreate DB connections after table structure changes
173331
*/

src/Core/Workers/Libs/WorkerCallEvents/ActionHangupChan.php

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020

2121
namespace MikoPBX\Core\Workers\Libs\WorkerCallEvents;
2222

23+
use MikoPBX\Common\Handlers\CriticalErrorsHandler;
2324
use MikoPBX\Common\Models\CallDetailRecordsTmp;
2425
use MikoPBX\Core\Asterisk\AsteriskManager;
2526
use MikoPBX\Core\Asterisk\Configs\VoiceMailConf;
2627
use MikoPBX\Core\System\SystemMessages;
2728
use MikoPBX\Core\System\Util;
2829
use MikoPBX\Core\Workers\WorkerCallEvents;
30+
use Throwable;
2931

3032
/**
3133
* Class ActionHangupChan
@@ -49,18 +51,23 @@ public static function execute(WorkerCallEvents $worker, array $data): void
4951
// Remove the agi_channel from the active channels in the worker.
5052
$worker->removeActiveChan($data['agi_channel']);
5153

52-
// Initialize arrays for channels and transfer calls.
53-
$channels = [];
54-
$transfer_calls = [];
54+
try {
55+
// Initialize arrays for channels and transfer calls.
56+
$channels = [];
57+
$transfer_calls = [];
5558

56-
// Hangup channel for end calls.
57-
self::hangupChanEndCalls($worker, $data, $transfer_calls, $channels);
59+
// Hangup channel for end calls.
60+
self::hangupChanEndCalls($worker, $data, $transfer_calls, $channels);
5861

59-
// Check if it's a regular transfer.
60-
CreateRowTransfer::execute($worker, 'hangup_chan', $data, $transfer_calls);
62+
// Check if it's a regular transfer.
63+
CreateRowTransfer::execute($worker, 'hangup_chan', $data, $transfer_calls);
6164

62-
// Check if it's a SIP transfer.
63-
self::hangupChanCheckSipTrtansfer($worker, $data, $channels);
65+
// Check if it's a SIP transfer.
66+
self::hangupChanCheckSipTrtansfer($worker, $data, $channels);
67+
} catch (Throwable $e) {
68+
// Prevent crash loop when CDR DB is unavailable or table is missing (issue #1000).
69+
CriticalErrorsHandler::handleExceptionWithSyslog($e);
70+
}
6471

6572
// Clear memory.
6673
if (isset($worker->checkChanHangupTransfer[$data['agi_channel']])) {

0 commit comments

Comments
 (0)