# -*- coding: utf-8 -*-
import random
import uuid
from functools import reduce
from dummy_spark.resultsiterable import ResultIterable
__author__ = 'willmcginnis'
[docs]class RDD(object):
"""
A Resilient Distributed Dataset (RDD) is the basic abstraction in Spark. It represents an immutable, partitioned
collection of elements that can be operated on in parallel. This is a dummy version of that, that is just a list
under the hood. To be used for testing, and maybe development if you play fast and loose.
Important note: the dummy RDD is NOT lazily loaded.
"""
def __init__(self, jrdd, ctx, jrdd_deserializer=None):
"""
:param jrdd:
:param ctx:
:param jrdd_deserializer:
:return:
"""
# ported
self._id = str(uuid.uuid4())
if jrdd is None:
self._jrdd = []
else:
if isinstance(jrdd, list):
self._jrdd = jrdd
elif isinstance(jrdd, set):
self._jrdd = list(jrdd)
else:
raise AttributeError('Type %s for jrdd not supported' % (type(jrdd), ))
self.ctx = ctx
self.is_cached = True
self._name = 'dummpy-rdd'
# not ported
self.is_checkpointed = False
self._jrdd_deserializer = jrdd_deserializer
self.partitioner = None
[docs] def id(self):
"""
:return:
"""
return self._id
@property
def context(self):
"""
:return:
"""
return self.ctx
[docs] def name(self):
"""
:return:
"""
return self._name
[docs] def setName(self, name):
"""
:param name:
:return:
"""
self._name = name
return self
def __repr__(self):
"""
:return:
"""
return str(self._jrdd)
[docs] def cache(self):
"""
:return:
"""
return self
[docs] def persist(self, storageLevel=None):
"""
:param storageLevel:
:return:
"""
return self
[docs] def unpersist(self):
"""
:return:
"""
return self
def _reserialize(self, serializer=None):
"""
:param serializer:
:return:
"""
return self
[docs] def checkpoint(self):
"""
:return:
"""
pass
[docs] def isCheckpointed(self):
"""
:return:
"""
return True
[docs] def getCheckpointFile(self):
"""
:return:
"""
return None
[docs] def map(self, f, preservesPartitioning=False):
"""
:param f:
:param preservesPartitioning:
:return:
"""
data = list(map(f, self._jrdd))
return RDD(data, self.ctx)
[docs] def flatMap(self, f, preservesPartitioning=False):
"""
:param f:
:param preservesPartitioning:
:return:
"""
data = [item for sl in map(f, self._jrdd) for item in sl]
return RDD(data, self.ctx)
[docs] def mapPartitions(self, f, preservesPartitioning=False):
"""
:param f:
:param preservesPartitioning:
:return:
"""
return self.map(f, preservesPartitioning=preservesPartitioning)
[docs] def getNumPartitions(self):
"""
:return:
"""
return 1
[docs] def filter(self, f):
"""
:param f:
:return:
"""
data = list(filter(f, self._jrdd))
return RDD(data, self.ctx)
[docs] def distinct(self, numPartitions=None):
"""
:param numPartitions:
:return:
"""
data = set(self._jrdd)
return RDD(data, self.ctx)
[docs] def sample(self, withReplacement, fraction, seed=None):
"""
:param withReplacement:
:param fraction:
:param seed:
:return:
"""
assert fraction >= 0.0, "Negative fraction value: %s" % fraction
if seed is not None:
random.seed(seed)
idx_list = list(range(len(self._jrdd)))
if withReplacement:
data = [self._jrdd[random.choice(idx_list)] for _ in list(range(int(fraction * len(self._jrdd))))]
else:
random.shuffle(idx_list)
data = [self._jrdd[idx] for idx in idx_list[:int(fraction * len(self._jrdd))]]
return RDD(data, self.ctx)
[docs] def randomSplit(self, weights, seed=None):
"""
:param weights:
:param seed:
:return:
"""
pass
[docs] def takeSample(self, withReplacement, num, seed=None):
"""
:param withReplacement:
:param num:
:param seed:
:return:
"""
assert num >= 0.0, "Negative sample num: %s" % num
if seed is not None:
random.seed(seed)
if withReplacement:
out = [self._jrdd[random.choice(list(range(len(self._jrdd))))] for _ in num]
else:
idx_list = list(range(len(self._jrdd)))
random.shuffle(idx_list)
out = [self._jrdd[idx] for idx in idx_list[:num]]
return out
[docs] def union(self, other):
"""
:param other:
:return:
"""
return RDD(self._jrdd + other._jrdd, self.ctx)
[docs] def intersection(self, other):
"""
:param other:
:return:
"""
data = [item for item in self._jrdd if item in other._jrdd]
return RDD(data, self.ctx)
def __add__(self, other):
"""
:param other:
:return:
"""
if not isinstance(other, RDD):
raise TypeError
return self.union(other)
[docs] def repartitionAndSortWithinPartitions(self, numPartitions=None, partitionFunc=None, ascending=True, keyfunc=lambda x: x):
"""
:param numPartitions:
:param partitionFunc:
:param ascending:
:param keyfunc:
:return:
"""
data = sorted(self._jrdd, key=keyfunc, reverse=ascending)
return RDD(data, self.ctx)
[docs] def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
"""
:param ascending:
:param numPartitions:
:param keyfunc:
:return:
"""
data = sorted(self._jrdd, key=keyfunc, reverse=ascending)
return RDD(data, self.ctx)
[docs] def sortBy(self, keyfunc, ascending=True, numPartitions=None):
"""
:param keyfunc:
:param ascending:
:param numPartitions:
:return:
"""
data = sorted(self._jrdd, key=keyfunc, reverse=ascending)
return RDD(data, self.ctx)
[docs] def glom(self):
"""
:return:
"""
return self._jrdd
[docs] def cartesian(self, other):
"""
:param other:
:return:
"""
data = [(t, u) for t in self._jrdd for u in other._jrdd]
return RDD(data, self.ctx)
[docs] def groupBy(self, f, numPartitions=None):
"""
:param f:
:param numPartitions:
:return:
"""
return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
[docs] def foreach(self, f):
"""
:param f:
:return:
"""
return self.map(f)
[docs] def foreachPartition(self, f):
"""
:param f:
:return:
"""
return f(self._jrdd)
[docs] def collect(self):
"""
:return:
"""
return self._jrdd
[docs] def sum(self):
"""
:return:
"""
return sum(self._jrdd)
[docs] def count(self):
"""
:return:
"""
return len(self._jrdd)
[docs] def mean(self):
"""
:return:
"""
return float(sum(self._jrdd)) / len(self._jrdd)
[docs] def take(self, num):
"""
:param num:
:return:
"""
return self._jrdd[:num]
[docs] def first(self):
"""
:return:
"""
return self._jrdd[0]
[docs] def isEmpty(self):
"""
:return:
"""
return len(self._jrdd) == 0
[docs] def reduceByKey(self, func, numPartitions=None):
"""
:param func:
:param numPartitions:
:return:
"""
keys = {kv[0] for kv in self._jrdd}
data = [(key, reduce(func, [kv[1] for kv in self._jrdd if kv[0] == key])) for key in keys]
return RDD(data, self.ctx)
# TODO: support variant with custom partitioner
[docs] def groupByKey(self, numPartitions=None):
"""
:param numPartitions:
:return:
"""
keys = {x[0] for x in self._jrdd}
out = {k: ResultIterable([x[1] for x in self._jrdd if x[0] == k]) for k in keys}
data = list(out.items())
return RDD(data, self.ctx)
[docs] def flatMapValues(self, f):
"""
:param f:
:return:
"""
flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1]))
return self.flatMap(flat_map_fn, preservesPartitioning=True)
[docs] def mapValues(self, f):
"""
:param f:
:return:
"""
map_values_fn = lambda kv: (kv[0], f(kv[1]))
return self.map(map_values_fn, preservesPartitioning=True)
[docs] def cogroup(self, other, numPartitions=None):
"""
:param other:
:param numPartitions:
:return:
"""
vs = {x[0] for x in self._jrdd}
us = {x[0] for x in other._jrdd}
keys = vs.union(us)
data = [
(
k,
([v[1] for v in self._jrdd if v[0] == k]),
([u[1] for u in other._jrdd if u[0] == k])
)
for k in keys
]
return RDD(data, self.ctx)
[docs] def zip(self, other):
"""
:param other:
:return:
"""
data = list(zip(other, self._jrdd))
return RDD(data, self.ctx)
[docs] def zipWithIndex(self):
"""
:return:
"""
data = [(b, a) for a, b in list(enumerate(self._jrdd))]
return RDD(data, self.ctx)
def _defaultReducePartitions(self):
"""
:return:
"""
return 1
[docs] def lookup(self, key):
"""
:param key:
:return:
"""
return [x for x in self._jrdd if x[0] == key]
[docs] def countApprox(self, timeout, confidence=0.95):
"""
:param timeout:
:param confidence:
:return:
"""
return len(self._jrdd)
[docs] def sumApprox(self, timeout, confidence=0.95):
"""
:param timeout:
:param confidence:
:return:
"""
return sum(self._jrdd)
[docs] def meanApprox(self, timeout, confidence=0.95):
"""
:param timeout:
:param confidence:
:return:
"""
return float(sum(self._jrdd)) / len(self._jrdd)
[docs] def countApproxDistinct(self, relativeSD=0.05):
"""
:param relativeSD:
:return:
"""
return len(set(self._jrdd))
[docs] def toLocalIterator(self):
"""
:return:
"""
for row in self._jrdd:
yield row
[docs] def max(self, key=None):
"""
:param key:
:return:
"""
if key is None:
return max(self._jrdd)
else:
raise NotImplementedError
[docs] def min(self, key=None):
"""
:param key:
:return:
"""
if key is None:
return min(self._jrdd)
else:
raise NotImplementedError
def _pickled(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
"""
NotImplemented
:param f:
:param preservesPartitioning:
:return:
"""
raise NotImplementedError
@staticmethod
def _computeFractionForSampleSize(sampleSizeLowerBound, total, withReplacement):
"""
NotImplemented
:param sampleSizeLowerBound:
:param total:
:param withReplacement:
:return:
"""
raise NotImplementedError
[docs] def pipe(self, command, env=None):
"""
NotImplemented
:param command:
:param env:
:return:
"""
raise NotImplementedError
[docs] def reduce(self, f):
"""
NotImplemented
:param f:
:return:
"""
raise NotImplementedError
[docs] def treeReduce(self, f, depth=2):
"""
NotImplemented
:param f:
:param depth:
:return:
"""
raise NotImplementedError
[docs] def fold(self, zeroValue, op):
"""
NotImplemented
:param zeroValue:
:param op:
:return:
"""
raise NotImplementedError
[docs] def aggregate(self, zeroValue, seqOp, combOp):
"""
NotImplemented
:param zeroValue:
:param seqOp:
:param combOp:
:return:
"""
raise NotImplementedError
[docs] def treeAggregate(self, zeroValue, seqOp, combOp, depth=2):
"""
NotImplemented
:param zeroValue:
:param seqOp:
:param combOp:
:param depth:
:return:
"""
raise NotImplementedError
[docs] def stats(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def histogram(self, buckets):
"""
NotImplemented
:param buckets:
:return:
"""
raise NotImplementedError
[docs] def variance(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def stdev(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def sampleStdev(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def sampleVariance(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def countByValue(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def top(self, num, key=None):
"""
NotImplemented
:param num:
:param key:
:return:
"""
raise NotImplementedError
[docs] def takeOrdered(self, num, key=None):
"""
NotImplemented
:param num:
:param key:
:return:
"""
raise NotImplementedError
[docs] def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
"""
NotImplemented
:param conf:
:param keyConverter:
:param valueConverter:
:return:
"""
raise NotImplementedError
[docs] def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None):
"""
NotImplemented
:param path:
:param outputFormatClass:
:param keyClass:
:param valueClass:
:param keyConverter:
:param valueConverter:
:param conf:
:return:
"""
raise NotImplementedError
[docs] def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
"""
NotImplemented
:param conf:
:param keyConverter:
:param valueConverter:
:return:
"""
raise NotImplementedError
[docs] def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None, compressionCodecClass=None):
"""
NotImplemented
:param path:
:param outputFormatClass:
:param keyClass:
:param valueClass:
:param keyConverter:
:param valueConverter:
:param conf:
:param compressionCodecClass:
:return:
"""
raise NotImplementedError
[docs] def saveAsSequenceFile(self, path, compressionCodecClass=None):
"""
NotImplemented
:param path:
:param compressionCodecClass:
:return:
"""
raise NotImplementedError
[docs] def saveAsPickleFile(self, path, batchSize=10):
"""
NotImplemented
:param path:
:param batchSize:
:return:
"""
raise NotImplementedError
[docs] def saveAsTextFile(self, path, compressionCodecClass=None):
"""
NotImplemented
:param path:
:param compressionCodecClass:
:return:
"""
raise NotImplementedError
[docs] def collectAsMap(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def keys(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def values(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def reduceByKeyLocally(self, func):
"""
NotImplemented
:param func:
:return:
"""
raise NotImplementedError
[docs] def countByKey(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def join(self, other, numPartitions=None):
"""
NotImplemented
:param other:
:param numPartitions:
:return:
"""
raise NotImplementedError
[docs] def leftOuterJoin(self, other, numPartitions=None):
"""
NotImplemented
:param other:
:param numPartitions:
:return:
"""
raise NotImplementedError
[docs] def rightOuterJoin(self, other, numPartitions=None):
"""
NotImplemented
:param other:
:param numPartitions:
:return:
"""
raise NotImplementedError
[docs] def fullOuterJoin(self, other, numPartitions=None):
"""
NotImplemented
:param other:
:param numPartitions:
:return:
"""
raise NotImplementedError
[docs] def partitionBy(self, numPartitions, partitionFunc=None):
"""
NotImplemented
:param numPartitions:
:param partitionFunc:
:return:
"""
raise NotImplementedError
[docs] def combineByKey(self, createCombiner, mergeValue, mergeCombiners, numPartitions=None):
"""
NotImplemented
:param createCombiner:
:param mergeValue:
:param mergeCombiners:
:param numPartitions:
:return:
"""
raise NotImplementedError
[docs] def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None):
"""
NotImplemented
:param zeroValue:
:param seqFunc:
:param combFunc:
:param numPartitions:
:return:
"""
raise NotImplementedError
[docs] def foldByKey(self, zeroValue, func, numPartitions=None):
"""
NotImplemented
:param zeroValue:
:param func:
:param numPartitions:
:return:
"""
raise NotImplementedError
def _can_spill(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
def _memory_limit(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def groupWith(self, other, *others):
"""
NotImplemented
:param other:
:param others:
:return:
"""
raise NotImplementedError
[docs] def sampleByKey(self, withReplacement, fractions, seed=None):
"""
NotImplemented
:param withReplacement:
:param fractions:
:param seed:
:return:
"""
raise NotImplementedError
[docs] def subtractByKey(self, other, numPartitions=None):
"""
NotImplemented
:param other:
:param numPartitions:
:return:
"""
raise NotImplementedError
[docs] def subtract(self, other, numPartitions=None):
"""
NotImplemented
:param other:
:param numPartitions:
:return:
"""
raise NotImplementedError
[docs] def keyBy(self, f):
"""
NotImplemented
:param f:
:return:
"""
raise NotImplementedError
[docs] def repartition(self, numPartitions):
"""
:param numPartitions:
:return:
"""
return self
[docs] def coalesce(self, numPartitions, shuffle=False):
"""
NotImplemented
:param numPartitions:
:param shuffle:
:return:
"""
raise NotImplementedError
[docs] def zipWithUniqueId(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def toDebugString(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def getStorageLevel(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
def _to_java_object_rdd(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError