PySpark performance issue spark 1.5.2 дистрибьюторы cloudera

При выполнении сценария PySpark возникают некоторые проблемы с производительностью:

import os
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext, HiveContext
from pyspark.sql.types import Row


def append_columns(row, dct):
    """
    :param row:
    :param dct:
    :type row:Row
    :type dct:dict
    :return:
    """
    schema = row.asDict()
    schema.update(dct)
    return Row(**schema)


def append_column(row, key, value):
    """
    :param row:
    :param key:
    :param value:
    :type row:Row
    :type key: str
    :return:
    """
    schema = row.asDict()
    if key not in schema:
        schema[key] = value
    res = Row(**schema)
    return res


class Components(object):
    def __init__(self):
        pass

    def first_components(self, row, key):
        """
        :param row:
        :param key:
        :type row: Row
        :type key:lambda for example labmda x: x.description
        :return:
        """
        pass

    def join(self, row, dict_other, key):
        """
        some logic with join
        :param row:
        :param dict_other:
        :param key:
        :return:
        :rtype: list
        was realized joining logic like "one to many" multiply per row ~150->1500

        """
        pass

    def some_action(self, x, key):
        pass


def append_category(row, key, is_exists_category, type_category):
    comp = Components()
    if int(is_exists_category) == 1:
        type_category = int(type_category)
        if type_category == 1:
            return append_column(row, "component", comp.first_components(row, key))
        elif type_category == 2:
            """
            copy paste
            """
            return append_column(row, "component", comp.first_components(row, key))
    else:
        return row


comp = Components()
conf = SparkConf()
sc = SparkContext(conf=conf)
sql = SQLContext(sparkContext=sc)
query = HiveContext(sparkContext=sc)
first = sql.parquetFile("some/path/to/parquetfile").rdd.collectAsMap()
first = sc.broadcast(first)
key = lambda x: x.description
"""sec has from select 2k rows"""
sec = query.sql("select bla, bla1, description from some_one").rdd 
    .filter(lambda x: x.bla1 > 10) 
    .map(lambda x: append_category(x, key, 1, 1)) 
    .map(lambda x: append_column(x, "hole_size", comp.some_action(x, key))) 
    .flatMap(lambda x: comp.join(x, first.value, key)) 
    .filter(lambda x: x)
table = 'db.some_one'
query.sql("DROP TABLE IF EXISTS {tbl}".format(tbl=table + "_test"))
query.createDataFrame(sec, samplingRatio=10).saveAsTable("{tbl}".format(tbl=table + "_dcka"), mode='overwrite',
                                                         path=os.path.join("some/path/to/",
                                                                           table.split('.')[1] + "_test"))

Конфигурация Spark:

  • 6 исполнителей
  • 2 ГБ на одного исполнителя

Этот скрипт работает почти 5 часов и история Spark показывает загрузку только на одном исполнителе. Секционирование не имеет никакого эффекта.

1 ответ

  1. Вы можете попробовать упростить свою логику бит за битом:

    rdd1 = query.sql("select bla, bla1, description from some_one").rdd
    rdd2 = sql.parquetFile("some/path/to/parquetfile").rdd
    rdd1.join (rdd2) 
    

    затем добавьте фильтрацию, а затем широковещательное соединение, если производительность отстой

    Вы можете контролировать количество ваших разделов через ‘ rdd.перекрытия.размер’, количество ваших разделов должно примерно соответствовать количеству ядер в целом кластере, чтобы все исполнители были вовлечены в обработку