Skip to content

Commit

Permalink
Update bipartition for utility functions
Browse files Browse the repository at this point in the history
  • Loading branch information
acmiyaguchi committed Dec 4, 2019
1 parent 80ec4c7 commit 13fa2c2
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 14 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name="wiki-forecast",
version="0.2",
version="0.3.2",
description="Forecasting models for wikipedia page views",
entry_points={"console_scripts": ["wiki-forecast=wikicast.__main__:cli"]},
install_requires=[
Expand Down
34 changes: 21 additions & 13 deletions wikicast/bipartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,28 @@ def induce_graph(graph, relabel=True, partitions=[]):
"id", F.row_number().over(window).alias("rank")
).withColumn("rank", F.expr("rank - 1"))

vertices = graph.vertices.join(rank, on="id", how="left")
vertices = graph.vertices.join(rank, on="id", how="left").withColumn(
"relabeled_id", F.col("id")
)

edges = graph.edges.join(
vertices.selectExpr("id as src", "rank as rank_src"), on="src", how="inner"
).join(vertices.selectExpr("id as dst", "rank as rank_dst"), on="dst", how="inner")
edges = (
graph.edges.join(
vertices.selectExpr("id as src", "rank as rank_src"), on="src", how="inner"
)
.join(
vertices.selectExpr("id as dst", "rank as rank_dst"), on="dst", how="inner"
)
.withColumn("relabeled_src", F.col("src"))
.withColumn("relabeled_dst", F.col("dst"))
)

# if partitions:
# edges = edges_with_partitions(GraphFrame(vertices, edges), partitions)

if relabel:
vertices = vertices.withColumn("relabeled_id", F.col("id")).withColumn(
"id", F.col("rank")
)
edges = (
edges.withColumn("relabeled_src", F.col("src"))
.withColumn("relabeled_dst", F.col("dst"))
.withColumn("src", F.col("rank_src"))
.withColumn("dst", F.col("rank_dst"))
vertices = vertices.withColumn("id", F.col("rank"))
edges = edges.withColumn("src", F.col("rank_src")).withColumn(
"dst", F.col("rank_dst")
)

vertices = vertices.drop("rank")
Expand All @@ -71,7 +78,7 @@ def undo_relabel(vertices, name="id", prefix="relabeled"):

def edges_with_partitions(graph, partitions):
"""
Assign each edge to a partition. If the edge is between two partitions, it
Assign each edge to a partition. Edges must be named uniquely.
is removed from the set. The select and where clauses are manually
specificed because two sets of joins lead to ambiguity in the partition
column. e.g.
Expand All @@ -92,6 +99,7 @@ def edges_with_partitions(graph, partitions):
lambda x, y: x & y, [F.col(c).isNotNull() for c in partitions]
)

# TODO: support subtracting edges
edges = (
graph.edges.join(
graph.vertices.select(
Expand Down

0 comments on commit 13fa2c2

Please sign in to comment.