Рубрики
Без рубрики

Вычисление корреляционной матрицы Пирсона на огромных наборах данных в Python

Одна из последних задач в Goodip состояла в том, чтобы вычислить сходство между примерно 480 тысячами предметов, имеющих … помеченные Python, Bigdata.

Одна из последних задач в Goodip должен был рассчитать сходство между около 480 тысяч предметов, имеющих около 800 наблюдений в диапазоне 0–50 тыс. Каждый. Сокращение размерности поставит под угрозу качество результатов длинного хвоста, что нежелательно. В следующей статье оценивается производительность различных реализаций, описывает, как разделить набор данных на несколько кусков и обработать их параллельно. Я также хочу упомянуть DeepGraph , который обеспечивает интересный, другой подход. В качестве короткого освежения корреляция Пирсона присваивает значение в диапазоне -1, что означает отрицательную корреляцию (если значение в увеличении, соответствующее значение в B уменьшается) и 1, положительная корреляция (если увеличивается B также увеличивается ) Значение 0 означает отсутствие корреляции. Это определено как где x̄ и ȳ являются средствами значений в x и y.

В нашем случае файл CSV, содержащий набор данных, составляет около 1 ГБ. Мы собираемся загрузить его и создать из него Numpy Ndarray.

items = pd.read_csv("./items.csv")
numpy_items = items.to_numpy() 

Просто попытка запустить np.corrcoef (numpy_items) повышает исключение MemoryError: невозможно выделить 1,6 TIB для массива с формой (480000, 480000) и типа данных Float64 – что иллюстрирует измерение проблемы. Фактическое количество пар, которые на самом деле необходимо обработать, – это количество различных, неупорядоченных комбинаций – выбор R -объектов из набора n объектов. ncr (n, r) = n!/r! (n – r)! В этом случае NCR (480000,2) , но все еще 115 199 760 000 комбинаций.

Давайте сначала сравним производительность различных реализаций из Pandas, Numpy и Cupy и выясним, что работает с параллелизацией. Поэтому мы используем только 1% подмножество данных.

import numpy as np
import cupy as cp
import pandas as pd
from scipy.stats import pearsonr
cp.cuda.set_allocator(None) # Disable cache
items = items[0:5000]
numpy_items = items.to_numpy()
cupy_items = cp.array(numpy_items)

# Pandas implementation
%%timeit -n 1 -r 5
r1 = items.T.corr()
# 1 loop, best of 5: 37 s per loop

# NumPy implementation
%%timeit -n 1 -r 5
r2 = np.corrcoef(numpy_items)
# 1 loop, best of 5: 1.21 s per loop

# CuPy implementation (on Tesla K80)
%%timeit -n 1 -r 5
r3 = cp.corrcoef(cupy_items)
# 1 loop, best of 5: 66 ms per loop

# NumPy custom
%%timeit -n 1 -r 5
ms = numpy_profiles.mean(axis=1)[(slice(None,None,None),None)]
datam = numpy_profiles - ms
datass = np.sqrt(np.sum(datam*datam, axis=1))
r4 = []
for i in range(numpy_profiles.shape[0]):
    temp = np.dot(datam[i:], datam[i].T)
    rs = temp / (datass[i:] * datass[i])
    r4.append(rs)
# 1 loop, best of 5: 44.7 s per loop

# CuPy custom version (on Tesla K80)
%%timeit -n 1 -r 5
ms = cupy_profiles.mean(axis=1)[(slice(None,None,None),None)]
datam = cupy_profiles - ms
datass = cp.sqrt(cp.sum(datam*datam, axis=1))
r5 = []
for i in range(cupy_profiles.shape[0]):
    temp = cp.dot(datam[i:], datam[i].T)
    rs = temp / (datass[i:] * datass[i])
    r5.append(rs)
# 1 loop, best of 5: 2.35 s per loop

Графические процессоры отлично работают с матричными операциями, что может быть причиной, почему Cupy превосходит другие реализации на сегодняшний день. Этот подход также может превзойти наш параллелизированный процессовый подход и будет оцениваться в последующей статье. Далее, однако, мы используем пользовательскую реализацию, которая использует Numpy, предварительно рассчитанные средства, и может быть легко распределена по различным процессам.

Поскольку мы не хотим предварительно выпускать все пары, в основном из -за ограничений памяти, подход состоит в том, чтобы взять одну строку, вычислить корреляцию со всеми другими строками ниже этого индекса, увеличения индекса и т. Д. Таким образом, мы исключаем рефлексивные сравнения и используем коммутативность функции, игнорируя порядок. Вычислительные усилия для первых индексов тогда намного выше, чем для последнего, и мы хотим разделить только на полных позициях индекса, оба принятые во внимание со следующим наивным разделением. Нам нужна функция NCR для количества общих комбинаций.

import operator as op
def ncr(n: int, r: int) -> int:
    """
    Calculates the number of different, unordered combination 
    of r objects from a set of n objects.
    nCr(n,r) = nPr(n,r) / r!

    Args:
        n (int): Number of objects in set
        r (int): Number of objects to combine
    Returns:
        int: Number of combinations
    """
    r = min(r, n-r)
    numer = reduce(op.mul, range(n, n-r, -1), 1)
    denom = reduce(op.mul, range(1, r+1), 1)
    return numer // denom

Поскольку Python 3.8 вы также можете использовать Math.comb. Следующий код находит индексы, которые разделяют индекс на N_CHUNKS, близкие к частям равного размера.

n_chunks = 50 # Number of chunks
n = numpy_items.shape[0] # Number items
nr_pairings = ncr(numpy_items.shape[0], 2) # Number of all pairings
# Minimum nr. of pairings per chunk
per_chunk = int(math.ceil(nr_pairings/(n_chunks - 1)))
split_indices = [] # Array containing the indices to split at
t = 0
for i in range(n + 1):
    # There are n - i pairings at index i
    t += n - i
    # If the current chunk has enough pairings 
    if t >= per_chunk:
        split_indices.append(i)
        t = 0

Мы можем проверить количество пар в каждом запуске чанка:

s_indices = [0] + split_indices + [n]
pairings_chunks = []
for i in range(len(s_indices)-1):
    start = s_indices[i]
    end = s_indices[i + 1]
    pairings_chunks.append(sum(range(n - end, n - start)))

Сначала мы предварительно выпускаем средства набора данных. Поскольку данные только для чтения, мы можем просто определить их в глобальной области. Все дочерние процессы смогут получить к нему доступ, и они не будут скопированы – при условии, что вы не пишете его.

ms = numpy_profiles.mean(axis=1)[(slice(None,None,None),None)]
datam = numpy_profiles - ms
datass = np.sqrt(np.sum(datam*datam, axis=1))

Первоначальная идея состояла в том, чтобы отправить результаты от работников на специального работника хранения через очередь. К сожалению, это значительно снизило производительность из -за блокировки и сериализации. Самым быстрым вариантом для реализации было написать результаты на диск непосредственно от работника.

output_dir = "/var/tmp/"
def processor_fun(*, i_start: int, i_end: int) -> None:
    """
    Calculate correlation of rows in the range of i_start and i_end
    and the rows below these indices.
Args:
        i_start (int): Index of the start row
        i_end (int): Index of the end row
    Returns:
        None
    """    
    for i in range(i_start, i_end):
        temp = np.dot(datam[i:], datam[i].T)
        rs = temp / (datass[i:] * datass[i])
        rs = np.delete(rs, [0])
        # Create nparray that contains information about the indices
        res = np.zeros((rs.shape[0], 3))
        res[:, 0] = i
        res[:, 1] = list(range(i+1, i + rs.shape[0] + 1))
        res[:, 2] = rs
        # Save CSV file
        np.savetxt(f'{output_dir}/{i}.csv', res,
                   delimiter=',',
                   newline='\n', 
                   fmt=['%d','%d','%0.13f'])
# Create pool of worker processes which will carry out the
# computation
n_cpus = mp.cpu_count()
pool = mp.Pool(n_cpus)

# Start workers
s_indices = [0] + split_indices
workers = []
for i in range(0, len(s_indices)):
    start = s_indices[i]
    end = s_indices[i+1] if i < len(s_indices)-1 else n-1
    workers.append(pool.apply_async(
        processor_fun, 
        kwds={'i_start': start, 'i_end': end}))
for r in workers:
    r.wait()
# Close the pool and wait till all workers are done
pool.close()
pool.terminate()
pool.join()

С помощью нашего набора данных это создает 480K CSV -файлы с общим размером ок. 4TB. Мы приказались, и GZIP сжал результаты до частей ~ 4 ГБ и загрузили их в Google BigQuery для производства.

Оригинал: “https://dev.to/linuskohl/computing-the-pearson-correlation-matrix-on-huge-datasets-in-python-561”