Source code for dummy_spark.rdd

# -*- coding: utf-8 -*-

import random
import uuid
from functools import reduce
from dummy_spark.resultsiterable import ResultIterable

__author__ = 'willmcginnis'


[docs]class RDD(object): """ A Resilient Distributed Dataset (RDD) is the basic abstraction in Spark. It represents an immutable, partitioned collection of elements that can be operated on in parallel. This is a dummy version of that, that is just a list under the hood. To be used for testing, and maybe development if you play fast and loose. Important note: the dummy RDD is NOT lazily loaded. """ def __init__(self, jrdd, ctx, jrdd_deserializer=None): """ :param jrdd: :param ctx: :param jrdd_deserializer: :return: """ # ported self._id = str(uuid.uuid4()) if jrdd is None: self._jrdd = [] else: if isinstance(jrdd, list): self._jrdd = jrdd elif isinstance(jrdd, set): self._jrdd = list(jrdd) else: raise AttributeError('Type %s for jrdd not supported' % (type(jrdd), )) self.ctx = ctx self.is_cached = True self._name = 'dummpy-rdd' # not ported self.is_checkpointed = False self._jrdd_deserializer = jrdd_deserializer self.partitioner = None
[docs] def id(self): """ :return: """ return self._id
@property def context(self): """ :return: """ return self.ctx
[docs] def name(self): """ :return: """ return self._name
[docs] def setName(self, name): """ :param name: :return: """ self._name = name return self
def __repr__(self): """ :return: """ return str(self._jrdd)
[docs] def cache(self): """ :return: """ return self
[docs] def persist(self, storageLevel=None): """ :param storageLevel: :return: """ return self
[docs] def unpersist(self): """ :return: """ return self
def _reserialize(self, serializer=None): """ :param serializer: :return: """ return self
[docs] def checkpoint(self): """ :return: """ pass
[docs] def isCheckpointed(self): """ :return: """ return True
[docs] def getCheckpointFile(self): """ :return: """ return None
[docs] def map(self, f, preservesPartitioning=False): """ :param f: :param preservesPartitioning: :return: """ data = list(map(f, self._jrdd)) return RDD(data, self.ctx)
[docs] def flatMap(self, f, preservesPartitioning=False): """ :param f: :param preservesPartitioning: :return: """ data = [item for sl in map(f, self._jrdd) for item in sl] return RDD(data, self.ctx)
[docs] def mapPartitions(self, f, preservesPartitioning=False): """ :param f: :param preservesPartitioning: :return: """ return self.map(f, preservesPartitioning=preservesPartitioning)
[docs] def getNumPartitions(self): """ :return: """ return 1
[docs] def filter(self, f): """ :param f: :return: """ data = list(filter(f, self._jrdd)) return RDD(data, self.ctx)
[docs] def distinct(self, numPartitions=None): """ :param numPartitions: :return: """ data = set(self._jrdd) return RDD(data, self.ctx)
[docs] def sample(self, withReplacement, fraction, seed=None): """ :param withReplacement: :param fraction: :param seed: :return: """ assert fraction >= 0.0, "Negative fraction value: %s" % fraction if seed is not None: random.seed(seed) idx_list = list(range(len(self._jrdd))) if withReplacement: data = [self._jrdd[random.choice(idx_list)] for _ in list(range(int(fraction * len(self._jrdd))))] else: random.shuffle(idx_list) data = [self._jrdd[idx] for idx in idx_list[:int(fraction * len(self._jrdd))]] return RDD(data, self.ctx)
[docs] def randomSplit(self, weights, seed=None): """ :param weights: :param seed: :return: """ pass
[docs] def takeSample(self, withReplacement, num, seed=None): """ :param withReplacement: :param num: :param seed: :return: """ assert num >= 0.0, "Negative sample num: %s" % num if seed is not None: random.seed(seed) if withReplacement: out = [self._jrdd[random.choice(list(range(len(self._jrdd))))] for _ in num] else: idx_list = list(range(len(self._jrdd))) random.shuffle(idx_list) out = [self._jrdd[idx] for idx in idx_list[:num]] return out
[docs] def union(self, other): """ :param other: :return: """ return RDD(self._jrdd + other._jrdd, self.ctx)
[docs] def intersection(self, other): """ :param other: :return: """ data = [item for item in self._jrdd if item in other._jrdd] return RDD(data, self.ctx)
def __add__(self, other): """ :param other: :return: """ if not isinstance(other, RDD): raise TypeError return self.union(other)
[docs] def repartitionAndSortWithinPartitions(self, numPartitions=None, partitionFunc=None, ascending=True, keyfunc=lambda x: x): """ :param numPartitions: :param partitionFunc: :param ascending: :param keyfunc: :return: """ data = sorted(self._jrdd, key=keyfunc, reverse=ascending) return RDD(data, self.ctx)
[docs] def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x): """ :param ascending: :param numPartitions: :param keyfunc: :return: """ data = sorted(self._jrdd, key=keyfunc, reverse=ascending) return RDD(data, self.ctx)
[docs] def sortBy(self, keyfunc, ascending=True, numPartitions=None): """ :param keyfunc: :param ascending: :param numPartitions: :return: """ data = sorted(self._jrdd, key=keyfunc, reverse=ascending) return RDD(data, self.ctx)
[docs] def glom(self): """ :return: """ return self._jrdd
[docs] def cartesian(self, other): """ :param other: :return: """ data = [(t, u) for t in self._jrdd for u in other._jrdd] return RDD(data, self.ctx)
[docs] def groupBy(self, f, numPartitions=None): """ :param f: :param numPartitions: :return: """ return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
[docs] def foreach(self, f): """ :param f: :return: """ return self.map(f)
[docs] def foreachPartition(self, f): """ :param f: :return: """ return f(self._jrdd)
[docs] def collect(self): """ :return: """ return self._jrdd
[docs] def sum(self): """ :return: """ return sum(self._jrdd)
[docs] def count(self): """ :return: """ return len(self._jrdd)
[docs] def mean(self): """ :return: """ return float(sum(self._jrdd)) / len(self._jrdd)
[docs] def take(self, num): """ :param num: :return: """ return self._jrdd[:num]
[docs] def first(self): """ :return: """ return self._jrdd[0]
[docs] def isEmpty(self): """ :return: """ return len(self._jrdd) == 0
[docs] def reduceByKey(self, func, numPartitions=None): """ :param func: :param numPartitions: :return: """ keys = {kv[0] for kv in self._jrdd} data = [(key, reduce(func, [kv[1] for kv in self._jrdd if kv[0] == key])) for key in keys] return RDD(data, self.ctx) # TODO: support variant with custom partitioner
[docs] def groupByKey(self, numPartitions=None): """ :param numPartitions: :return: """ keys = {x[0] for x in self._jrdd} out = {k: ResultIterable([x[1] for x in self._jrdd if x[0] == k]) for k in keys} data = list(out.items()) return RDD(data, self.ctx)
[docs] def flatMapValues(self, f): """ :param f: :return: """ flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1])) return self.flatMap(flat_map_fn, preservesPartitioning=True)
[docs] def mapValues(self, f): """ :param f: :return: """ map_values_fn = lambda kv: (kv[0], f(kv[1])) return self.map(map_values_fn, preservesPartitioning=True)
[docs] def cogroup(self, other, numPartitions=None): """ :param other: :param numPartitions: :return: """ vs = {x[0] for x in self._jrdd} us = {x[0] for x in other._jrdd} keys = vs.union(us) data = [ ( k, ([v[1] for v in self._jrdd if v[0] == k]), ([u[1] for u in other._jrdd if u[0] == k]) ) for k in keys ] return RDD(data, self.ctx)
[docs] def zip(self, other): """ :param other: :return: """ data = list(zip(other, self._jrdd)) return RDD(data, self.ctx)
[docs] def zipWithIndex(self): """ :return: """ data = [(b, a) for a, b in list(enumerate(self._jrdd))] return RDD(data, self.ctx)
def _defaultReducePartitions(self): """ :return: """ return 1
[docs] def lookup(self, key): """ :param key: :return: """ return [x for x in self._jrdd if x[0] == key]
[docs] def countApprox(self, timeout, confidence=0.95): """ :param timeout: :param confidence: :return: """ return len(self._jrdd)
[docs] def sumApprox(self, timeout, confidence=0.95): """ :param timeout: :param confidence: :return: """ return sum(self._jrdd)
[docs] def meanApprox(self, timeout, confidence=0.95): """ :param timeout: :param confidence: :return: """ return float(sum(self._jrdd)) / len(self._jrdd)
[docs] def countApproxDistinct(self, relativeSD=0.05): """ :param relativeSD: :return: """ return len(set(self._jrdd))
[docs] def toLocalIterator(self): """ :return: """ for row in self._jrdd: yield row
[docs] def max(self, key=None): """ :param key: :return: """ if key is None: return max(self._jrdd) else: raise NotImplementedError
[docs] def min(self, key=None): """ :param key: :return: """ if key is None: return min(self._jrdd) else: raise NotImplementedError
def _pickled(self): """ NotImplemented :return: """ raise NotImplementedError
[docs] def mapPartitionsWithIndex(self, f, preservesPartitioning=False): """ NotImplemented :param f: :param preservesPartitioning: :return: """ raise NotImplementedError
@staticmethod def _computeFractionForSampleSize(sampleSizeLowerBound, total, withReplacement): """ NotImplemented :param sampleSizeLowerBound: :param total: :param withReplacement: :return: """ raise NotImplementedError
[docs] def pipe(self, command, env=None): """ NotImplemented :param command: :param env: :return: """ raise NotImplementedError
[docs] def reduce(self, f): """ NotImplemented :param f: :return: """ raise NotImplementedError
[docs] def treeReduce(self, f, depth=2): """ NotImplemented :param f: :param depth: :return: """ raise NotImplementedError
[docs] def fold(self, zeroValue, op): """ NotImplemented :param zeroValue: :param op: :return: """ raise NotImplementedError
[docs] def aggregate(self, zeroValue, seqOp, combOp): """ NotImplemented :param zeroValue: :param seqOp: :param combOp: :return: """ raise NotImplementedError
[docs] def treeAggregate(self, zeroValue, seqOp, combOp, depth=2): """ NotImplemented :param zeroValue: :param seqOp: :param combOp: :param depth: :return: """ raise NotImplementedError
[docs] def stats(self): """ NotImplemented :return: """ raise NotImplementedError
[docs] def histogram(self, buckets): """ NotImplemented :param buckets: :return: """ raise NotImplementedError
[docs] def variance(self): """ NotImplemented :return: """ raise NotImplementedError
[docs] def stdev(self): """ NotImplemented :return: """ raise NotImplementedError
[docs] def sampleStdev(self): """ NotImplemented :return: """ raise NotImplementedError
[docs] def sampleVariance(self): """ NotImplemented :return: """ raise NotImplementedError
[docs] def countByValue(self): """ NotImplemented :return: """ raise NotImplementedError
[docs] def top(self, num, key=None): """ NotImplemented :param num: :param key: :return: """ raise NotImplementedError
[docs] def takeOrdered(self, num, key=None): """ NotImplemented :param num: :param key: :return: """ raise NotImplementedError
[docs] def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None): """ NotImplemented :param conf: :param keyConverter: :param valueConverter: :return: """ raise NotImplementedError
[docs] def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None): """ NotImplemented :param path: :param outputFormatClass: :param keyClass: :param valueClass: :param keyConverter: :param valueConverter: :param conf: :return: """ raise NotImplementedError
[docs] def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None): """ NotImplemented :param conf: :param keyConverter: :param valueConverter: :return: """ raise NotImplementedError
[docs] def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None, compressionCodecClass=None): """ NotImplemented :param path: :param outputFormatClass: :param keyClass: :param valueClass: :param keyConverter: :param valueConverter: :param conf: :param compressionCodecClass: :return: """ raise NotImplementedError
[docs] def saveAsSequenceFile(self, path, compressionCodecClass=None): """ NotImplemented :param path: :param compressionCodecClass: :return: """ raise NotImplementedError
[docs] def saveAsPickleFile(self, path, batchSize=10): """ NotImplemented :param path: :param batchSize: :return: """ raise NotImplementedError
[docs] def saveAsTextFile(self, path, compressionCodecClass=None): """ NotImplemented :param path: :param compressionCodecClass: :return: """ raise NotImplementedError
[docs] def collectAsMap(self): """ NotImplemented :return: """ raise NotImplementedError
[docs] def keys(self): """ NotImplemented :return: """ raise NotImplementedError
[docs] def values(self): """ NotImplemented :return: """ raise NotImplementedError
[docs] def reduceByKeyLocally(self, func): """ NotImplemented :param func: :return: """ raise NotImplementedError
[docs] def countByKey(self): """ NotImplemented :return: """ raise NotImplementedError
[docs] def join(self, other, numPartitions=None): """ NotImplemented :param other: :param numPartitions: :return: """ raise NotImplementedError
[docs] def leftOuterJoin(self, other, numPartitions=None): """ NotImplemented :param other: :param numPartitions: :return: """ raise NotImplementedError
[docs] def rightOuterJoin(self, other, numPartitions=None): """ NotImplemented :param other: :param numPartitions: :return: """ raise NotImplementedError
[docs] def fullOuterJoin(self, other, numPartitions=None): """ NotImplemented :param other: :param numPartitions: :return: """ raise NotImplementedError
[docs] def partitionBy(self, numPartitions, partitionFunc=None): """ NotImplemented :param numPartitions: :param partitionFunc: :return: """ raise NotImplementedError
[docs] def combineByKey(self, createCombiner, mergeValue, mergeCombiners, numPartitions=None): """ NotImplemented :param createCombiner: :param mergeValue: :param mergeCombiners: :param numPartitions: :return: """ raise NotImplementedError
[docs] def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None): """ NotImplemented :param zeroValue: :param seqFunc: :param combFunc: :param numPartitions: :return: """ raise NotImplementedError
[docs] def foldByKey(self, zeroValue, func, numPartitions=None): """ NotImplemented :param zeroValue: :param func: :param numPartitions: :return: """ raise NotImplementedError
def _can_spill(self): """ NotImplemented :return: """ raise NotImplementedError def _memory_limit(self): """ NotImplemented :return: """ raise NotImplementedError
[docs] def groupWith(self, other, *others): """ NotImplemented :param other: :param others: :return: """ raise NotImplementedError
[docs] def sampleByKey(self, withReplacement, fractions, seed=None): """ NotImplemented :param withReplacement: :param fractions: :param seed: :return: """ raise NotImplementedError
[docs] def subtractByKey(self, other, numPartitions=None): """ NotImplemented :param other: :param numPartitions: :return: """ raise NotImplementedError
[docs] def subtract(self, other, numPartitions=None): """ NotImplemented :param other: :param numPartitions: :return: """ raise NotImplementedError
[docs] def keyBy(self, f): """ NotImplemented :param f: :return: """ raise NotImplementedError
[docs] def repartition(self, numPartitions): """ :param numPartitions: :return: """ return self
[docs] def coalesce(self, numPartitions, shuffle=False): """ NotImplemented :param numPartitions: :param shuffle: :return: """ raise NotImplementedError
[docs] def zipWithUniqueId(self): """ NotImplemented :return: """ raise NotImplementedError
[docs] def toDebugString(self): """ NotImplemented :return: """ raise NotImplementedError
[docs] def getStorageLevel(self): """ NotImplemented :return: """ raise NotImplementedError
def _to_java_object_rdd(self): """ NotImplemented :return: """ raise NotImplementedError