Рубрики
Без рубрики

Pyspark – пример подсчета слов

Автор оригинала: Python Examples.

Pyspark – подсчет слова

В этом примере подсчета слов Pyspark мы узнаем, как посчитать вхождению уникальных слов в текстовой строке. Конечно, мы узнаем Map-Unize, основной шаг для изучения больших данных.

Python Program

import sys
 
from pyspark import SparkContext, SparkConf
 
if __name__ == "__main__":
	
	# create Spark context with necessary configuration
	sc = SparkContext("local","PySpark Word Count Exmaple")
	
	# read data from text file and split each line into words
	words = sc.textFile("D:/workspace/spark/input.txt").flatMap(lambda line: line.split(" "))
	
	# count the occurrence of each word
	wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a +b)
	
	# save the counts to output
	wordCounts.saveAsTextFile("D:/workspace/spark/output/")

Запустите это зажигание Python Spark.

> spark-submit pyspark_example.py

Если приложение работает без каких-либо ошибок, на выходе указана выходная папка. D:/Workspace/Spark/вывод/ Отказ

Если вы попытаетесь запустить приложение еще раз, вы можете получить ошибку на выходе консоли, как показано ниже.

Выход

py4j.protocol.Py4JJavaError: An error occurred while calling o44.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/D:/workspace/spark/output already exists
        at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
        at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:287)
        at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)

Это связано с тем, что во время нашего первого запуска создана выходная папка. Прежде чем попробовать это снова, вам нужно явно удалить выходную папку.

Проанализируйте ввод и вывод количества слов Pyspark

Давайте проанализируем ввод и вывод этого примера.

Мы предоставили следующие данные в текстовом файле ввода.

Python Lists allow us to hold items of heterogeneous types. In this article, we will learn how to create a list in Python; access the list items; find the number of items in the list, how to add an item to list; how to remove an item from the list; loop through list items; sorting a list, reversing a list; and many more transformation and aggregation actions on Python Lists.

В выходной папке вы заметите следующий список файлов.

Открыть часть-00000. Содержимое будет показано ниже:

('Python', 2)
('Lists', 1)
('allow', 1)
('us', 1)
('to', 5)
('hold', 1)
('items', 2)
('of', 2)
('heterogeneous', 1)
('types.', 1)
('In', 1)
('this', 1)
('article,', 1)
('we', 1)
('will', 1)
('learn', 1)
('how', 3)
('create', 1)
('a', 3)
('list', 3)
('in', 2)
('Python;', 1)
('access', 1)
('the', 4)
('items;', 2)
('find', 1)
('number', 1)
('list,', 2)
('add', 1)
('an', 2)
('item', 2)
('list;', 3)
('remove', 1)
('from', 1)
('loop', 1)
('through', 1)
('sorting', 1)
('reversing', 1)
('and', 2)
('many', 1)
('more', 1)
('transformation', 1)
('aggregation', 1)
('actions', 1)
('on', 1)
('Lists.', 1)

Что мы сделали в Pyspark Word Count?

Мы создали SparkContext для подключения драйвера, который проходит локально.

sc = SparkContext("local","PySpark Word Count Exmaple")

Затем мы прочитаем входной текстовый файл, используя переменную SPARCCONTEXT и создал плоский аппарат слов. Слова имеют тип pythonrdd.

words = sc.textFile("D:/workspace/spark/input.txt").flatMap(lambda line: line.split(" "))

Мы разделили слова, используя одно пространство в качестве сепаратора.

Затем мы рассмотрим каждое слово к ключу: значение пара Word: 1, 1 – количество вхождений.

words.map(lambda word: (word, 1))

Затем результат уменьшается ключом, что является словом, а значения добавляются.

reduceByKey(lambda a,b:a +b)

Результат сохраняется в текстовый файл.

wordCounts.saveAsTextFile("D:/workspace/spark/output/")

Резюме

Заключение этого руководства примеров Python, мы узнали, как посчитать вхождению уникальных слов, использующих Pyspark.