Dmitry Popov (thedeemon) wrote,
Dmitry Popov
thedeemon

Categories:

ICFPC'09 и ускорение интерпретатора

Несколько дней назад закончилось соревнование ICFP Contest 2009, мой небольшой отчет об участии в котором можно почитать здесь. Там в задании была описана несложная виртуальная машина, на которой запускались данные организаторами программы для моделирования орбитальной механики. Тут я расскажу, как, используя возможности функционального языка программирования, можно заметно ускорить интерпретатор.

Update: Это НЕ попытка сравнить скорости С++ и Окамла. Это НЕ попытка описать самый быстрый подход к интерпретации. Это просто техническая часть отчета об ICFPC с рассказом о примененном мною там приеме.
ВМ имеет память из 16384 вещественных чисел и программу не большей длины. Программа состоит из последовательности команд, выполняющихся по порядку, без ветвлений. Команды бывают без аргументов, с одним или с двумя аргументами. Для большинства команд результат выполнения (вещественное число) записывается в ячейку памяти с номером, равным номеру команды в программе. Аргументы берутся из памяти, их адреса закодированы в одном двойном слове с командой. Есть одноаргументные операции сравнения с нулем, их результат (истина или ложь) сохраняется в неком регистре статуса, который потом используется в команде выбора, записывающей в свою ячейку содержимое либо первого аргумента, либо второго, в зависимости от текущего значения статуса.
С учетом способа кодирования инструкций и аргументов, интерпретатор этой ВМ на С++ выглядит так:

  void run()
  {
    for(int ip=0; ip<prglen; ip++)   {
      const int dop = prg[ip]>>28;
      const int dr1 = (prg[ip] >> 14) & 0x3FFF; 
      const int dr2 = prg[ip] & 0x3FFF;
      switch(dop) {
        case 1: mem[ip] = mem[dr1] + mem[dr2]; break;
        case 2: mem[ip] = mem[dr1] - mem[dr2]; break;
        case 3: mem[ip] = mem[dr1] * mem[dr2]; break;
        case 4: mem[ip] = (mem[dr2]==0.0) ? 0.0 : mem[dr1] / mem[dr2]; break;
        case 5: out[dr1] = mem[dr2]; break;
        case 6: mem[ip] = status ? mem[dr1] : mem[dr2]; break;
        case 0:  { 
            const int sop = prg[ip] >> 24;
            const int imm = (prg[ip] >> 21) & 7;
            const int sr1 = dr2;
            switch(sop) {          
              case 0: break;
              case 1: switch(imm) {
                    case 0: status = mem[sr1] < 0; break;
                    case 1: status = mem[sr1] <= 0; break;
                    case 2: status = mem[sr1] == 0; break; 
                    case 3: status = mem[sr1] >= 0; break; 
                    case 4: status = mem[sr1] > 0; break; 
                    default: printf("bad imm=%d ip=%d\n", imm, ip); return;
                  } break;
              case 2: mem[ip] = sqrt(mem[sr1]); break;
              case 3: mem[ip] = mem[sr1]; break;
              case 4: mem[ip] = inp[sr1]; break;
              default: printf("bad sop=%d ip=%d\n", sop, ip); return;
            }
            }
            break;
        default: printf("bad dop=%d ip=%d\n", dop, ip); return;
      } 
    }
  }

Такая реализация на моей рабочей машинке 2,33 ГГц выполняет первую программу из 266 команд 377 000 раз в секунду. Но при участии в ICPFC я все писал на OCaml'e. На нем систему команд ВМ я описал в виде алгебраического типа, расшифровку команд и аргументов вынес отдельно, и интерпретатор выглядел так:

let eval_instr ip vm = function
  | Add(r1,r2) -> vm.mem.(ip) <- vm.mem.(r1) +. vm.mem.(r2)
  | Sub(r1,r2) -> vm.mem.(ip) <- vm.mem.(r1) -. vm.mem.(r2) 
  | Mul(r1,r2) -> vm.mem.(ip) <- vm.mem.(r1) *. vm.mem.(r2)
  | Div(r1,r2) -> let v2 = vm.mem.(r2) in vm.mem.(ip) <- if v2=0.0 then 0.0 else vm.mem.(r1) /. v2 
  | Out(r1,r2) -> vm.outports.(r1) <- vm.mem.(r2)
  | Phi(r1,r2) -> vm.mem.(ip) <- if vm.status then vm.mem.(r1) else vm.mem.(r2)
  | Nop -> ()
  | Cmpz(LTZ, r1) -> vm.status <- vm.mem.(r1) < 0.0
  | Cmpz(LEZ, r1) -> vm.status <- vm.mem.(r1) <= 0.0
  | Cmpz(EQZ, r1) -> vm.status <- vm.mem.(r1) = 0.0
  | Cmpz(GEZ, r1) -> vm.status <- vm.mem.(r1) >= 0.0
  | Cmpz(GTZ, r1) -> vm.status <- vm.mem.(r1) > 0.0
  | Sqrt r1 -> vm.mem.(ip) <- if vm.mem.(r1) >= 0.0 then sqrt vm.mem.(r1) else 0.0
  | Copy r1 -> vm.mem.(ip) <- vm.mem.(r1)
  | Inp r1 -> vm.mem.(ip) <- vm.inports.(r1);;

let interpret vm = 
  for ip = 0 to vm.prglen - 1 do
    eval_instr ip vm vm.prg.(ip)
  done;
  vm.time <- vm.time + 1;
  if vm.outports.(0)>0.0 && vm.scoretime=0 then vm.scoretime <- vm.time;;


Тут после собственно исполнения программы идет увеличение счетчика времени и проверка на выполнение задания, в варианте на С++ их нет. Такой интепретатор на Окамле наглядней, меньше подвержен ошибкам, и написался очень быстро, но работал несколько медленнее: только 263 000 итераций в секунду. Захотелось его ускорить, и были сделаны две оптимизации.

Оптимизация первая.
В исходном виде интерпретатора у нас в цикле крутится switch. Но от него можно избавиться, вынеся его за пределы цикла. Для этого мы каждую команду программы превратим в функцию. Она будет выполнять предписанное командой действие и передавать управление следующей такой функции. Вместо пробегания по программе в цикле мы будем вызывать функцию для первой команды, она - функцию для второй и т.д. При "компиляции" каждой команды нам нужно иметь уже "скомпилированную" функцию для следующей, поэтому эту псевдокомпиляцию будем проводить с конца программы. Самой последней команде в качестве такой функции-продолжения мы дадим функцию, выполняющую действия, которые нужно сделать в конце каждой итерации - увеличение счетчика времени и проверка очков. У ВМ есть особая инструкция Noop (у меня она обозначена более привычно Nop), которая ничего не делает. Для нее новой функции создаваться не будет, а просто полученное продолжение будет передаваться дальше. Благодаря этому все nop'ы и их последовательности будут вообще бесплатны. Отдельным образом будут "компилироваться" команды сравнения и выбора. Они всегда идут подряд парами - сначала сравнение аргумента с нулем Cmp*, затем выбор результата Phi. По сути это одна трехаргументная команда. Поэтому последовательность этих двух команд будет "компилироваться" в одну функцию. Для этого результатом "компиляции" Phi (которая в программе идет позже, поэтому "компилируется" раньше) будет не функция-продолжение, а структура с ее аргументами. При компиляции команды сравнения эта структура будет использоваться для создания функции, объединяющей в себе Cmp* и Phi.
Опишем тип результата псевдокомпиляции. Это или функция-продолжение, или аргументы Phi и ее продолжение:

type comp_res = Cont of (unit -> unit) | Phiargs of int * int * int * (unit -> unit);;   

Псевдокомпиляция команды ВМ в такой результат делается просто:

let comp_instr ip vm ins nxt = 
  match ins, nxt with
  | Add(r1,r2), Cont k -> Cont(fun () -> vm.mem.(ip) <- vm.mem.(r1) +. vm.mem.(r2); k ())    
  | Sub(r1,r2), Cont k -> Cont(fun () -> vm.mem.(ip) <- vm.mem.(r1) -. vm.mem.(r2); k ())    
  | Mul(r1,r2), Cont k -> Cont(fun () -> vm.mem.(ip) <- vm.mem.(r1) *. vm.mem.(r2); k ())    
  | Div(r1,r2), Cont k -> Cont(fun () -> 
      let v2 = vm.mem.(r2) in vm.mem.(ip) <- if v2=0.0 then 0.0 else vm.mem.(r1) /. v2; k ())  
  | Out(r1,r2), Cont k -> Cont(fun () -> vm.outports.(r1) <- vm.mem.(r2); k ())    
  | Phi(r1,r2), Cont k -> Phiargs(r1, r2, ip, k)
  | Nop, Cont k -> Cont k 
  | Cmpz(LTZ, r1), Phiargs(p1,p2, pip, k) -> Cont(fun () -> 
      vm.mem.(pip) <- if vm.mem.(r1) < 0.0 then vm.mem.(p1) else vm.mem.(p2); k ())
  | Cmpz(LEZ, r1), Phiargs(p1,p2, pip, k) -> Cont(fun () -> 
      vm.mem.(pip) <- if vm.mem.(r1) <= 0.0 then vm.mem.(p1) else vm.mem.(p2); k ())
  | Cmpz(EQZ, r1), Phiargs(p1,p2, pip, k) -> Cont(fun () -> 
      vm.mem.(pip) <- if vm.mem.(r1) = 0.0 then vm.mem.(p1) else vm.mem.(p2); k ())
  | Cmpz(GEZ, r1), Phiargs(p1,p2, pip, k) -> Cont(fun () -> 
      vm.mem.(pip) <- if vm.mem.(r1) >= 0.0 then vm.mem.(p1) else vm.mem.(p2); k ())
  | Cmpz(GTZ, r1), Phiargs(p1,p2, pip, k) -> Cont(fun () -> 
      vm.mem.(pip) <- if vm.mem.(r1) > 0.0 then vm.mem.(p1) else vm.mem.(p2); k ())
  | Sqrt r1, Cont k -> Cont(fun () -> vm.mem.(ip) <- if vm.mem.(r1) >= 0.0 then sqrt vm.mem.(r1) else 0.0; k ())
  | Copy r1, Cont k -> Cont(fun () -> vm.mem.(ip) <- vm.mem.(r1); k ())
  | Inp r1,  Cont k -> Cont(fun () -> vm.mem.(ip) <- vm.inports.(r1); k ())
  | _, _ -> Printf.printf "compile error: ip=%d\n" ip; failwith "comp_instr";;  

А псевдокомпиляция всей программы будет выглядеть так:

let comp_prg vm = 
  let rec loop ip nxt = 
    if ip < 0 then nxt else
    let ci = comp_instr ip vm vm.prg.(ip) nxt in
    loop (ip-1) ci  in
  match loop (vm.prglen-1) (Cont(fun ()-> 
    vm.time <- vm.time + 1;  if vm.outports.(0)>0.0 && vm.scoretime=0 then vm.scoretime <- vm.time)) 
  with Cont k -> k  | Phiargs _ -> failwith "comp_prg: Phiargs returned";;  

Функция comp_prg получает на вход структуру ВМ, содержащую программу и память, а на выход выдает функцию, которая выполняет один проход программы данного экземпляра ВМ. Ее можно вызывать вместо interpret vm, и скорость получается уже 417 000 итераций в секунду, т.е. уже быстрее, чем реализация ВМ на С++. Но и это не предел.

Оптимизация вторая.
На каждой итерации ВМ читает какие-то данные из входных портов, производит вычисления, зависящие от них и от состояния памяти, пишет что-то в память и выдает что-то на выходные порты. В данном случае на входе были моментальные изменения скорости спутника, а на выходе его координаты, остаток топлива и другие данные. Большую часть времени спутник просто летит по орбите без использования двигателей, т.е. значения входных портов для большинства итераций одни и те же. Это можно использовать и убрать из цикла вычисления, зависящие от неизменяющихся данных, записав вместо них Nop'ы с уже посчитанным результатом. Алгоритм оптимизации очень прост. Заводим массив булевых значений, показывающих для каждой команды является ли ее результат константой при условии константности входов. Изначально это истина для команд Input и Nop и ложь для всех остальных. Дальше проходим по программе и для каждой операции кроме Output смотрим не являются ли константами ее аргументы. Если да, то вычисляем результат операции, записываем в нужную ячейку памяти, а команду заменяем на Nop, пометив ее как константу. Так делаем несколько раз: на каждом проходе число константных операций или увеличивается (когда заменили очередную команду) или не изменяется (когда больше уже нечего заменить). Если после прохода ничего не изменилось, значит мы попали в Fixed Point и оптимизация готова, возвращаем новый экземпляр ВМ с новой программой и новым содержимым памяти:

let optimize vm = 
  let prg = Array.sub vm.prg 0 (vm.prglen) in  
  let propagate cons mem p = 
    let con = Array.copy cons in
    let p' = p |> Array.mapi (fun ip ins ->    
      match ins with 
      | Add(r1,r2) -> if con.(r1) && con.(r2) then (mem.(ip) <- mem.(r1) +. mem.(r2); con.(ip)<-true; Nop) else ins 
      | Sub(r1,r2) -> if con.(r1) && con.(r2) then (mem.(ip) <- mem.(r1) -. mem.(r2); con.(ip)<-true; Nop) else ins
      | Mul(r1,r2) -> if con.(r1) && con.(r2) then (mem.(ip) <- mem.(r1) *. mem.(r2); con.(ip)<-true; Nop) else ins
      | Div(r1,r2) -> if con.(r1) && con.(r2) then 
                        (mem.(ip) <- if mem.(r2)=0.0 then 0.0 else mem.(r1) /. mem.(r2); con.(ip)<-true; Nop) else ins 
      | Out(r1,r2) -> ins
      | Phi(r1,r2) -> if ip>0 then begin
                        match prg.(ip-1) with 
                        | Cmpz _ ->  if con.(ip-1) then
                                      if mem.(ip-1) > 0.0 then Copy r1 else Copy r2
                                    else ins
                        | _ -> ins
                      end  else ins
      | Nop -> ins
      | Cmpz(LTZ, r1) -> if con.(r1) then (mem.(ip) <- if mem.(r1) < 0.0 then 1.0 else 0.0; con.(ip)<-true; Nop) else ins
      | Cmpz(LEZ, r1) -> if con.(r1) then (mem.(ip) <- if mem.(r1) <= 0.0 then 1.0 else 0.0; con.(ip)<-true; Nop) else ins
      | Cmpz(EQZ, r1) -> if con.(r1) then (mem.(ip) <- if mem.(r1) = 0.0 then 1.0 else 0.0; con.(ip)<-true; Nop) else ins
      | Cmpz(GEZ, r1) -> if con.(r1) then (mem.(ip) <- if mem.(r1) >= 0.0 then 1.0 else 0.0; con.(ip)<-true; Nop) else ins
      | Cmpz(GTZ, r1) -> if con.(r1) then (mem.(ip) <- if mem.(r1) > 0.0 then 1.0 else 0.0; con.(ip)<-true; Nop) else ins
      | Sqrt r1 -> if con.(r1) then (mem.(ip) <- if mem.(r1) >= 0.0 then sqrt mem.(r1) else 0.0; con.(ip)<-true; Nop) else ins
      | Copy r1 -> if con.(r1) then (mem.(ip) <- mem.(r1); con.(ip)<-true; Nop) else ins
      | Inp r1 -> mem.(ip) <- vm.inports.(r1); con.(ip)<-true; Nop) in
    con, p'   in
  let mem = Array.copy vm.mem in
  let rec loop p con =
    let con', p' = propagate con mem p in
    if con' = con && p = p' then p, con else loop p' con'  in 
  let const = prg |> Array.map (function Nop | Inp _ -> true | _ -> false) in
  let p, con = loop prg const  in
  { prg = p; prglen = vm.prglen; mem = mem; 
    inports = Array.copy vm.inports; outports = Array.copy vm.outports; status = vm.status;
    time = vm.time; scenario = vm.scenario; log = vm.log; scoretime = vm.scoretime  };;      

Такая оптимизация на первой задаче, например, увеличивает количество пустых команд с 11% до 39%. Оптимизированная программа подвергается описанной выше псевдокомпиляции, и скорость возрастает до 650 000 итераций в секунду, что в 1,7 раза быстрее варианта на С++ и в 2,5 раза быстрее исходного интепретатора на Окамле. Если наш спутник меняет траекторию, то в этом момент используется неоптимизированный вариант, а после того, как он ляжет на новую орбиту с выключенным двигателем, текущий экземпляр ВМ проходит оптимизацию и опять получаем быстрый ваприант уже с новыми значениями.
Tags: fp, interpreter optimization, ocaml, vm
Subscribe
  • Post a new comment

    Error

    Anonymous comments are disabled in this journal

    default userpic

    Your IP address will be recorded 

  • 46 comments