R0 CREW

Патчим Perl для устранения Undefined Behavior

Проблема

Представьте себе такую задачу: необходимо эффективно читать с диска большие объёмы данных в последовательном режиме. Для этого вы создаёте модуль чтения с буферизацией, а полученные байты распаковываете в нужный вид стандартной функцией unpack.

my $block_size = 0x100000;
my $buf = RCE::File::read_bytes($fd, $block_size);
my @data;
for (my $i = 0; $i < $block_size; $i += 16) {
    # Распаковываем 4 дворда по смещению i
    push @data, unpack('@' . $i . 'V4', $buf);
}

Если внимательно прочитать документацию по unpack, то окажется, что возвращаемое значение в некоторых случаях не определено:

http://perldoc.perl.org/functions/unpack.html

Утверждается, что если в шаблоне указано больше полей, чем данных в буфере, то функция может либо уменьшить их количество, либо вернуть пустые строки или нули для лишних полей, либо выбросить исключение.

Проверим, что произойдёт при указании в шаблоне лишних полей различного типа.

use strict;
use warnings;
use Data::Dumper;

print "Unsigned byte:\n";
print Dumper [unpack("C", "")];
print Dumper [unpack("C*", "")];
print Dumper [unpack("C4", "ZZZ")];

print "\nUnsigned word:\n";
print Dumper [unpack("v", "")];
print Dumper [unpack("v*", "")];
print Dumper [unpack("v4", "ZZZ")];

print "\nUnsigned dword:\n";
print Dumper [unpack("V", "")];
print Dumper [unpack("V*", "")];
print Dumper [unpack("V4", "ZZZ")];

print "\nBinary string:\n";
print Dumper [unpack("a", "")];
print Dumper [unpack("a*", "")];
print Dumper [unpack("a4", "ZZZ")];

print "\nASCII string:\n";
print Dumper [unpack("A", "")];
print Dumper [unpack("A*", "")];
print Dumper [unpack("A4", "ZZZ")];

print "\nASCIIZ string:\n";
print Dumper [unpack("Z", "")];
print Dumper [unpack("Z*", "")];
print Dumper [unpack("Z4", "ZZZ")];

Для каждого типа делается по три вызова: распаковать одно поле, распаковать поля до конца буфера, распаковать 4 поля. Возвращаемые значения кладутся в анонимный массив для удобства. Вывод будет следующим.

Unsigned byte:
$VAR1 = [];
$VAR1 = [];
$VAR1 = [
          90,
          90,
          90
        ];

Unsigned word:
$VAR1 = [];
$VAR1 = [];
$VAR1 = [
          23130
        ];

Unsigned dword:
$VAR1 = [];
$VAR1 = [];
$VAR1 = [];

Binary string:
$VAR1 = [
          ''
        ];
$VAR1 = [
          ''
        ];
$VAR1 = [
          'ZZZ'
        ];

ASCII string:
$VAR1 = [
          ''
        ];
$VAR1 = [
          ''
        ];
$VAR1 = [
          'ZZZ'
        ];

Null-terminated ASCII string:
$VAR1 = [
          ''
        ];
$VAR1 = [
          ''
        ];
$VAR1 = [
          'ZZZ'
        ];

Разберём каждую секцию. Распаковка “Unsigned byte” для одного и неограниченного числа полей вернула undef (модуль Data::Dumper отобразил пустой массив как “$VAR1 = [];”). Для четырёх полей функция уменьшила счётчик с 4-х до 3-х и вернула три байта. Пока всё как и ожидалось. “Unsigned word” и “Unsigned dword” ведут себя точно так же.

Со строковыми типами ситуация иная, вместо undef для первых двух вызовов возвращаются пустые строки. Третьи вызовы, как и ожидалось, возвращают столько символов, сколько имеется в буфере.

Почему возврат пустой строки вместо undef является проблемой? Представьте, что нужно распаковать попеременно несколько чисел и строк, как в коде ниже.

print Dumper [unpack("V A3 A4", "\x01\x00\x00\x00")];

Результат получится безрадостным.

$VAR1 = [
          1,
          '',
          ''
        ];

Безрадостен он по той причине, что мы не можем определить, произошла ошибка или нет. Вместо того, чтобы проверить размер массива операцией scalar со сложностью О(1) и по нему сказать, все ли элементы были распакованы, мы вынуждены идти по всему массиву от начала до первой пустой строки, что расходует уже О(n). Для массивов размером в тысячи и более элементов такой вариант является неприемлемым. Можно, конечно, сначала искать первую попавшуюся пустую строку бинарным поиском, а потом идти назад, к первой пустой строке, но это всё ещё не лучший вариант.

Если бы как-то добиться проверки успешности распаковки за константное время, то даже в случае, когда ошибка произойдёт при распаковке последней строки и та будет обрезана, мы обнаружим ошибку путём сравнения длины этой строки с ожидаемой.

Решение

Вносить свои правки в программу такого масштаба, как интерпретатор Perl, может быть не лучшей идеей в большинстве случаев, ибо неясно, где и как они могут аукнуться. С другой стороны, мы собираемся исправлять неопределённое поведение, от которого в идеальном мире ничто не зависит. Ну, волков бояться - в лес не ходить.

Скачаем исходники Perl и взглянем на файл perl-5.26.1/pp_pack.c, в котором реализованы функции pack/unpack. Нас интересуют обработчики строковых типов “a”, “A”, “Z”.

STATIC I32
S_unpack_rec(pTHX_ tempsym_t* symptr, const char *s, const char *strbeg, const char *strend, const char **new_s )
{
...
	switch(TYPE_NO_ENDIANNESS(datumtype)) {
	default:
	    Perl_croak(aTHX_ "Invalid type '%c' in unpack", (int)TYPE_NO_MODIFIERS(datumtype) );
...
	case '%':
	    if (howlen == e_no_len)
		len = 16;		/* len is not specified */
	    checksum = len;
	    cuv = 0;
	    cdouble = 0;
	    continue;
...
	case 'A':
	case 'Z':
	case 'a':
	    if (checksum) {
		/* Preliminary length estimate is assumed done in 'W' */
		if (len > strend - s) len = strend - s;
		goto W_checksum;
	    }
	    if (utf8) {
		I32 l;
		const char *hop;
		for (l=len, hop=s; l>0; l--, hop += UTF8SKIP(hop)) {
		    if (hop >= strend) {
			if (hop > strend)
			    Perl_croak(aTHX_ "Malformed UTF-8 string in unpack");
			break;
		    }
		}
		if (hop > strend)
		    Perl_croak(aTHX_ "Malformed UTF-8 string in unpack");
		len = hop - s;
	    } else if (len > strend - s)
		len = strend - s;

	    if (datumtype == 'Z') {
		/* 'Z' strips stuff after first null */
		const char *ptr, *end;
		end = s + len;
		for (ptr = s; ptr < end; ptr++) if (*ptr == 0) break;
		sv = newSVpvn(s, ptr-s);
		if (howlen == e_star) /* exact for 'Z*' */
		    len = ptr-s + (ptr != strend ? 1 : 0);
	    } else if (datumtype == 'A') {
		/* 'A' strips both nulls and spaces */
		const char *ptr;
		if (utf8 && (symptr->flags & FLAG_WAS_UTF8)) {
                    for (ptr = s+len-1; ptr >= s; ptr--) {
                        if (   *ptr != 0
                            && !UTF8_IS_CONTINUATION(*ptr)
                            && !isSPACE_utf8_safe(ptr, strend))
                        {
                            break;
                        }
                    }
		    if (ptr >= s) ptr += UTF8SKIP(ptr);
		    else ptr++;
		    if (ptr > s+len)
			Perl_croak(aTHX_ "Malformed UTF-8 string in unpack");
		} else {
		    for (ptr = s+len-1; ptr >= s; ptr--)
			if (*ptr != 0 && !isSPACE(*ptr)) break;
		    ptr++;
		}
		sv = newSVpvn(s, ptr-s);
	    } else sv = newSVpvn(s, len);

	    if (utf8) {
		SvUTF8_on(sv);
		/* Undo any upgrade done due to need_utf8() */
		if (!(symptr->flags & FLAG_WAS_UTF8))
		    sv_utf8_downgrade(sv, 0);
	    }
	    mXPUSHs(sv);
	    s += len;
	    break;
...
}

Разберёмся, что тут происходит. Сначала выполняется проверка чексуммы, если та задана.

	    if (checksum) {
		/* Preliminary length estimate is assumed done in 'W' */
		if (len > strend - s) len = strend - s;
		goto W_checksum;
	    }

Устанавливается контрольная сумма в единственном месте, в отдельном блоке case, который обрабатывает неописанный в документации тип “%”. Предположительно, используется только внутри интерпретатора.

	case '%':
	    if (howlen == e_no_len)
		len = 16;		/* len is not specified */
	    checksum = len;
	    cuv = 0;
	    cdouble = 0;
	    continue;

Далее корректируется длина строки, если это необходимо. Если строка - UTF8, проверяется её валидность.

	    if (utf8) {
		I32 l;
		const char *hop;
		for (l=len, hop=s; l>0; l--, hop += UTF8SKIP(hop)) {
		    if (hop >= strend) {
			if (hop > strend)
			    Perl_croak(aTHX_ "Malformed UTF-8 string in unpack");
			break;
		    }
		}
		if (hop > strend)
		    Perl_croak(aTHX_ "Malformed UTF-8 string in unpack");
		len = hop - s;
	    } else if (len > strend - s)
		len = strend - s;

Теперь начинается непосредственно обработка трёх типов. Если тип - “Z” (ASCIIZ-строка), ищется первый нулевой байт в строке и вызывается макрос newSVpvn.

	    if (datumtype == 'Z') {
		/* 'Z' strips stuff after first null */
		const char *ptr, *end;
		end = s + len;
		for (ptr = s; ptr < end; ptr++) if (*ptr == 0) break;
		sv = newSVpvn(s, ptr-s);
		if (howlen == e_star) /* exact for 'Z*' */
		    len = ptr-s + (ptr != strend ? 1 : 0);

Сделаем небольшое отступление. Во внутреннем устройстве Перла три основных типа данных: SV (Scalar Value) - строки, числа и указатели; AV (Array Value) - массивы; HV (Hash Value) - хэши. SV, в свою очередь, подразделяется на IV (Signed Integer Value), UV (Unsigned Integer Value), NV (Double), PV (Pointer Value).

В коде выше вызывается макрос newSVpvn, который дословно означает “создать новое SV-значение с типом PV”. Он принимает на вход указатель на данные, необязательно строку, и их длину, и на их основе создаёт SV-значение. Прелесть в том, этот макрос не копирует строку, а лишь создаёт для неё специальный хидер, чтобы эта строка стала SV.

Вернёмся к коду, который затем обрабатывает тип “A” (ASCII-строка с обрезанными пробелами и нуль-байтами на конце). После удаления лишних символов точно так же вызывается newSVpvn для создания SV из строки.

	    } else if (datumtype == 'A') {
		/* 'A' strips both nulls and spaces */
		const char *ptr;
		if (utf8 && (symptr->flags & FLAG_WAS_UTF8)) {
                    for (ptr = s+len-1; ptr >= s; ptr--) {
                        if (   *ptr != 0
                            && !UTF8_IS_CONTINUATION(*ptr)
                            && !isSPACE_utf8_safe(ptr, strend))
                        {
                            break;
                        }
                    }
		    if (ptr >= s) ptr += UTF8SKIP(ptr);
		    else ptr++;
		    if (ptr > s+len)
			Perl_croak(aTHX_ "Malformed UTF-8 string in unpack");
		} else {
		    for (ptr = s+len-1; ptr >= s; ptr--)
			if (*ptr != 0 && !isSPACE(*ptr)) break;
		    ptr++;
		}
		sv = newSVpvn(s, ptr-s);

Наконец, для типа “a” (строка байт) никакой обработки не требуется, просто создаётся SV.

	    } else sv = newSVpvn(s, len);

Теперь, когда строка во внутреннем представлении создана, для UTF8-строк выполняется даунгрейд, иначе говоря конвертирование из символов обратно в байты.

	    if (utf8) {
		SvUTF8_on(sv);
		/* Undo any upgrade done due to need_utf8() */
		if (!(symptr->flags & FLAG_WAS_UTF8))
		    sv_utf8_downgrade(sv, 0);
	    }

В завершении на внутренний стек Перла помещается строка и обработка заканчивается.

	    mXPUSHs(sv);
	    s += len;
	    break;

Здесь я не буду касаться того, как работают стеки Перла, достаточно лишь сказать, что есть прямое соответствие между вызовом mXPUSHs и значением, которое возвращается скрипту на Перле. В частности, вызов mXPUSHs добавляет ещё одну строку в массив, возвращаемый функцией unpack.

Собственно, теперь можно понять, как решить проблему: необходимо проверить длину строки, и если она равна нулю, то не вызывать mXPUSHs - это равносильно возврату undef, что и требуется.

Дифф патча выглядит следующим образом.

--- "perl-5.26.1-origin\\pp_pack.c"     2017-07-19 01:50:13.000000000 +0300
+++ "perl-5.26.1\\pp_pack.c"    2017-11-16 15:53:52.325705000 +0300
@@ -1102,7 +1102,9 @@
                if (!(symptr->flags & FLAG_WAS_UTF8))
                    sv_utf8_downgrade(sv, 0);
            }
-           mXPUSHs(sv);
+        if (SvCUR(sv) != 0) {
+           mXPUSHs(sv);
+        }
            s += len;
            break;
        case 'B':

SvCUR - ещё один макрос, возвращающий длину SV-строки. С помощью него мы добавляем на стек только непустые строки. Патч готов.

Проверка решения

Скомпилируем Perl и сравним нашу версию со Strawberry Perl.

> c:\Strawberry\perl\bin\perl -e "use Data::Dumper; print Dumper [unpack(\"V A3 A4\", \"\x01\x00\x00\x00\")];"

$VAR1 = [
          1,
          '',
          ''
        ];
> perl-5.26.1\perl -e "use Data::Dumper; print Dumper [unpack(\"V A3 A4\", \"\x01\x00\x00\x00\")];"

$VAR1 = [
          1
        ];

Проблема решена.