ddf_utils.chef.model package

ddf_utils.chef.model.chef

The Chef object

class ddf_utils.chef.model.chef.Chef(dag: ddf_utils.chef.model.dag.DAG = None, metadata=None, config=None, cooking=None, serving=None, recipe=None)

Bases: object

the chef api

add_config(**config)

add configs, all keyword args will be added/replace existing in config dictionary

add_dish(ingredients, options=None)
add_ingredient(**kwargs)

add a new ingredient in DAG.

keyword arguments will send as a dictionary to the dictionary keyword of ddf_utils.chef.model.ingredient.ingredient_from_dict() method.

add_metadata(**metadata)

add metadata, all keyword args will be added/replace existing in metadata dictionary

add_procedure(collection, procedure, ingredients, result=None, options=None)
config
copy()
classmethod from_recipe(recipe_file, **config)
ingredients
static register_procedure(func)
run(serve=False, outpath=None)
serving
to_graph(node=None)
to_recipe(fp=None)

write chef in yaml recipe format

validate()

validate if the chef is good to run.

The following will be tested:

  1. check if datasets required by ingredients are available
  2. check if procedures are available
  3. check if the DAG is valid. i.e no dependency cycle, no missing dependency.

ddf_utils.chef.model.dag

the DAG model of chef

The DAG consists of 2 types of nodes: IngredientNode and ProcedureNode. each node will have a evaluate() function, which will return an ingredient on eval.

class ddf_utils.chef.model.dag.BaseNode(node_id, chef)

Bases: object

The base node which IngredientNode and ProcedureNode inherit from

Parameters:
  • node_id (str) – the name of the node
  • dag (DAG) – the DAG object the node is in
add_downstream(node)
add_upstream(node)
detect_missing_dependency()

check if every upstream is available in the DAG. raise error if something is missing

downstream_list
evaluate()
get_direct_relatives(upstream=False)

Get the direct relatives to the current node, upstream or downstream.

upstream_list
class ddf_utils.chef.model.dag.DAG(node_dict=None)

Bases: object

The DAG model.

A dag (directed acyclic graph) is a collection of tasks with directional dependencies. DAGs essentially act as namespaces for its nodes. A node_id can only be added once to a DAG.

add_dependency(upstream_node_id, downstream_node_id)

Simple utility method to set dependency between two nodes that already have been added to the DAG using add_node()

add_node(node)

add a node to DAG

copy()
detect_cycles()

Detect cycles in DAG, following Tarjan’s algorithm.

get_node(node_id)
has_node(node_id)
node_dict
nodes

return all nodes

roots

return the roots of the DAG

tree_view()

Shows an ascii tree representation of the DAG

class ddf_utils.chef.model.dag.IngredientNode(node_id, ingredient, chef)

Bases: ddf_utils.chef.model.dag.BaseNode

Node for storing dataset ingredients.

Parameters:ingredient (Ingredient) – the ingredient in this node
evaluate() → ddf_utils.chef.model.ingredient.Ingredient

return the ingredient as is

class ddf_utils.chef.model.dag.ProcedureNode(node_id, procedure, chef)

Bases: ddf_utils.chef.model.dag.BaseNode

The node for storing procedure results

The evaluate() function will run a procedure according to self.procedure, using other nodes’ data. Other nodes will be evaluated if when necessary.

Parameters:procedure (dict) – the procedure dictionary
evaluate() → ddf_utils.chef.model.ingredient.Ingredient

ddf_utils.chef.model.ingredient

main ingredient class

class ddf_utils.chef.model.ingredient.Ingredient(id: str, key: Union[list, str], value: Union[list, dict, str] = '*', dataset: str = None, data: dict = None, row_filter: dict = None, base_dir: str = './')

Bases: abc.ABC

Protocol class for all ingredients.

all ingredients should have following format:

id: example-ingredient
dataset: ddf--example--dataset
key: "geo,time"  # key columns of ingredient
value:  # only include concepts listed here
  - concept_1
  - concept_2
filter:  # select rows by column values
  geo:  # only keep datapoint where `geo` is in [swe, usa, chn]
    - swe
    - usa
    - chn

The other way to define the ingredient data is using the data keyword to include external csv file, or inline the data in the ingredient definition. Example:

id: example-ingredient
key: concept
data: external_concepts.csv

On-the-fly ingredient:

id: example-ingredient
key: concept
data:
    - concept: concept_1
      name: concept_name_1
      concept_type: string
      description: concept_description_1
    - concept: concept_2
      name: concept_name_2
      concept_type: measure
      description: concept_description_2
dataset_path

return the full path to ingredient’s dataset if the ingredient is from local ddf dataset.

ddf
ddf_id
dtype = 'abc'
static filter_row(data: dict, row_filter)

return the rows selected by row_filter.

classmethod from_procedure_result(id, key, data_computed: dict)
get_data()
ingredient_type
serve(*args, **kwargs)

serving data to disk

class ddf_utils.chef.model.ingredient.ConceptIngredient(id: str, key: Union[list, str], value: Union[list, dict, str] = '*', dataset: str = None, data: dict = None, row_filter: dict = None, base_dir: str = './')

Bases: ddf_utils.chef.model.ingredient.Ingredient

dtype = 'concepts'
get_data() → Dict[str, <Mock name='mock.DataFrame' id='139917546889360'>]
static get_data_from_ddf_dataset(dataset_path, value, row_filter)
static get_data_from_external_csv(file_path, key, row_filter)
static get_data_from_inline_data(data, key, row_filter)
serve(outpath, **options)

serving data to disk

class ddf_utils.chef.model.ingredient.EntityIngredient(id: str, key: Union[list, str], value: Union[list, dict, str] = '*', dataset: str = None, data: dict = None, row_filter: dict = None, base_dir: str = './')

Bases: ddf_utils.chef.model.ingredient.Ingredient

dtype = 'entities'
get_data() → Dict[str, <Mock name='mock.DataFrame' id='139917546889360'>]
static get_data_from_ddf_dataset(dataset_path, key, value, row_filter)
static get_data_from_external_csv(file_path, key, row_filter)
static get_data_from_inline_data(data, key, row_filter)
serve(outpath, **options)

serving data to disk

class ddf_utils.chef.model.ingredient.DataPointIngredient(id: str, key: Union[list, str], value: Union[list, dict, str] = '*', dataset: str = None, data: dict = None, row_filter: dict = None, base_dir: str = './')

Bases: ddf_utils.chef.model.ingredient.Ingredient

compute() → Dict[str, <Mock name='mock.DataFrame' id='139917546889360'>]

return a pandas dataframe version of self.data

dtype = 'datapoints'
classmethod from_procedure_result(id, key, data_computed: dict)
get_data() → Dict[str, <Mock name='mock.dataframe.DataFrame' id='139917523271376'>]
static get_data_from_ddf_dataset(id, dataset_path, key, value, row_filter)
static get_data_from_external_csv(file_path, key, row_filter)
static get_data_from_inline_data(data, key, row_filter)
serve(outpath, **options)

serving data to disk

ddf_utils.chef.model.ingredient.ingredient_from_dict(dictionary: dict, **chef_options) → ddf_utils.chef.model.ingredient.Ingredient

create ingredient from recipe definition and options. Parameters for ingredient should be passed in a dictionary. See the doc for 3. Define Ingredients or ddf_utils.chef.model.ingredient.Ingredient for available parameters.

ddf_utils.chef.model.ingredient.key_to_list(key)

make a list that contains primaryKey of this ingredient

ddf_utils.chef.model.ingredient.get_ingredient_class(cls)