Одна из последних задач в Goodip должен был рассчитать сходство между около 480 тысяч предметов, имеющих около 800 наблюдений в диапазоне 0–50 тыс. Каждый. Сокращение размерности поставит под угрозу качество результатов длинного хвоста, что нежелательно. В следующей статье оценивается производительность различных реализаций, описывает, как разделить набор данных на несколько кусков и обработать их параллельно. Я также хочу упомянуть DeepGraph , который обеспечивает интересный, другой подход. В качестве короткого освежения корреляция Пирсона присваивает значение в диапазоне -1, что означает отрицательную корреляцию (если значение в увеличении, соответствующее значение в B уменьшается) и 1, положительная корреляция (если увеличивается B также увеличивается ) Значение 0 означает отсутствие корреляции. Это определено как где x̄ и ȳ являются средствами значений в x и y.
В нашем случае файл CSV, содержащий набор данных, составляет около 1 ГБ. Мы собираемся загрузить его и создать из него Numpy Ndarray.
items = pd.read_csv("./items.csv") numpy_items = items.to_numpy()
Просто попытка запустить np.corrcoef (numpy_items) повышает исключение MemoryError: невозможно выделить 1,6 TIB для массива с формой (480000, 480000) и типа данных Float64 – что иллюстрирует измерение проблемы. Фактическое количество пар, которые на самом деле необходимо обработать, – это количество различных, неупорядоченных комбинаций – выбор R -объектов из набора n объектов. ncr (n, r) = n!/r! (n – r)! В этом случае NCR (480000,2) , но все еще 115 199 760 000 комбинаций.
Давайте сначала сравним производительность различных реализаций из Pandas, Numpy и Cupy и выясним, что работает с параллелизацией. Поэтому мы используем только 1% подмножество данных.
import numpy as np import cupy as cp import pandas as pd from scipy.stats import pearsonr cp.cuda.set_allocator(None) # Disable cache items = items[0:5000] numpy_items = items.to_numpy() cupy_items = cp.array(numpy_items) # Pandas implementation %%timeit -n 1 -r 5 r1 = items.T.corr() # 1 loop, best of 5: 37 s per loop # NumPy implementation %%timeit -n 1 -r 5 r2 = np.corrcoef(numpy_items) # 1 loop, best of 5: 1.21 s per loop # CuPy implementation (on Tesla K80) %%timeit -n 1 -r 5 r3 = cp.corrcoef(cupy_items) # 1 loop, best of 5: 66 ms per loop # NumPy custom %%timeit -n 1 -r 5 ms = numpy_profiles.mean(axis=1)[(slice(None,None,None),None)] datam = numpy_profiles - ms datass = np.sqrt(np.sum(datam*datam, axis=1)) r4 = [] for i in range(numpy_profiles.shape[0]): temp = np.dot(datam[i:], datam[i].T) rs = temp / (datass[i:] * datass[i]) r4.append(rs) # 1 loop, best of 5: 44.7 s per loop # CuPy custom version (on Tesla K80) %%timeit -n 1 -r 5 ms = cupy_profiles.mean(axis=1)[(slice(None,None,None),None)] datam = cupy_profiles - ms datass = cp.sqrt(cp.sum(datam*datam, axis=1)) r5 = [] for i in range(cupy_profiles.shape[0]): temp = cp.dot(datam[i:], datam[i].T) rs = temp / (datass[i:] * datass[i]) r5.append(rs) # 1 loop, best of 5: 2.35 s per loop
Графические процессоры отлично работают с матричными операциями, что может быть причиной, почему Cupy превосходит другие реализации на сегодняшний день. Этот подход также может превзойти наш параллелизированный процессовый подход и будет оцениваться в последующей статье. Далее, однако, мы используем пользовательскую реализацию, которая использует Numpy, предварительно рассчитанные средства, и может быть легко распределена по различным процессам.
Поскольку мы не хотим предварительно выпускать все пары, в основном из -за ограничений памяти, подход состоит в том, чтобы взять одну строку, вычислить корреляцию со всеми другими строками ниже этого индекса, увеличения индекса и т. Д. Таким образом, мы исключаем рефлексивные сравнения и используем коммутативность функции, игнорируя порядок. Вычислительные усилия для первых индексов тогда намного выше, чем для последнего, и мы хотим разделить только на полных позициях индекса, оба принятые во внимание со следующим наивным разделением. Нам нужна функция NCR для количества общих комбинаций.
import operator as op def ncr(n: int, r: int) -> int: """ Calculates the number of different, unordered combination of r objects from a set of n objects. nCr(n,r) = nPr(n,r) / r! Args: n (int): Number of objects in set r (int): Number of objects to combine Returns: int: Number of combinations """ r = min(r, n-r) numer = reduce(op.mul, range(n, n-r, -1), 1) denom = reduce(op.mul, range(1, r+1), 1) return numer // denom
Поскольку Python 3.8 вы также можете использовать Math.comb. Следующий код находит индексы, которые разделяют индекс на N_CHUNKS, близкие к частям равного размера.
n_chunks = 50 # Number of chunks n = numpy_items.shape[0] # Number items nr_pairings = ncr(numpy_items.shape[0], 2) # Number of all pairings # Minimum nr. of pairings per chunk per_chunk = int(math.ceil(nr_pairings/(n_chunks - 1))) split_indices = [] # Array containing the indices to split at t = 0 for i in range(n + 1): # There are n - i pairings at index i t += n - i # If the current chunk has enough pairings if t >= per_chunk: split_indices.append(i) t = 0
Мы можем проверить количество пар в каждом запуске чанка:
s_indices = [0] + split_indices + [n] pairings_chunks = [] for i in range(len(s_indices)-1): start = s_indices[i] end = s_indices[i + 1] pairings_chunks.append(sum(range(n - end, n - start)))
Сначала мы предварительно выпускаем средства набора данных. Поскольку данные только для чтения, мы можем просто определить их в глобальной области. Все дочерние процессы смогут получить к нему доступ, и они не будут скопированы – при условии, что вы не пишете его.
ms = numpy_profiles.mean(axis=1)[(slice(None,None,None),None)] datam = numpy_profiles - ms datass = np.sqrt(np.sum(datam*datam, axis=1))
Первоначальная идея состояла в том, чтобы отправить результаты от работников на специального работника хранения через очередь. К сожалению, это значительно снизило производительность из -за блокировки и сериализации. Самым быстрым вариантом для реализации было написать результаты на диск непосредственно от работника.
output_dir = "/var/tmp/" def processor_fun(*, i_start: int, i_end: int) -> None: """ Calculate correlation of rows in the range of i_start and i_end and the rows below these indices. Args: i_start (int): Index of the start row i_end (int): Index of the end row Returns: None """ for i in range(i_start, i_end): temp = np.dot(datam[i:], datam[i].T) rs = temp / (datass[i:] * datass[i]) rs = np.delete(rs, [0]) # Create nparray that contains information about the indices res = np.zeros((rs.shape[0], 3)) res[:, 0] = i res[:, 1] = list(range(i+1, i + rs.shape[0] + 1)) res[:, 2] = rs # Save CSV file np.savetxt(f'{output_dir}/{i}.csv', res, delimiter=',', newline='\n', fmt=['%d','%d','%0.13f']) # Create pool of worker processes which will carry out the # computation n_cpus = mp.cpu_count() pool = mp.Pool(n_cpus) # Start workers s_indices = [0] + split_indices workers = [] for i in range(0, len(s_indices)): start = s_indices[i] end = s_indices[i+1] if i < len(s_indices)-1 else n-1 workers.append(pool.apply_async( processor_fun, kwds={'i_start': start, 'i_end': end})) for r in workers: r.wait() # Close the pool and wait till all workers are done pool.close() pool.terminate() pool.join()
С помощью нашего набора данных это создает 480K CSV -файлы с общим размером ок. 4TB. Мы приказались, и GZIP сжал результаты до частей ~ 4 ГБ и загрузили их в Google BigQuery для производства.
Оригинал: “https://dev.to/linuskohl/computing-the-pearson-correlation-matrix-on-huge-datasets-in-python-561”