Install
- pyspark - graphframe Docker Image
https://hub.docker.com/r/tomerlevi/jupyter-spark-graphframes
- Docker Run
docker run -p 8888:8888 -p 4040:4040 -v ~:/home/jovyan/workspace --name jupyter tomerlevi/jupyter-spark-graphframes
Use
- 위 Docker Container가 실행된 후,
Jupyter Lab
으로 이동
http://127.0.0.1:8888/lab
pyspark
를 사용하기 위해,Session
을 생성해야함.
#from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("Spark") \
.config("spark.some.config.option" , "some-value") \
.config("spark.sql.repl.eagerEval.enabled", True) \
.getOrCreate()
- 나는
파일
로 존재하는Json Data
를 가져와Graphframe을 통해 graph를 생성
하려 함.
with open ("/home/jovyan/work/response.json", 'r') as f:
fread = f.read()
context = json.loads(fread)
with open ("/home/jovyan/work/result.json", 'w') as f:
f.write(json.dumps(...))
df = spark.read.json("/home/jovyan/work/result.json")
주의! :
파일형태의 Json을 read하는 과정에서 오류가 발생 할 수 있다.
그때는 Json을 한줄 형태로 바꿔보자.
- GraphFrames에서 사용하는
노드, 연관 관계
는Vertices와 Edges
로 구분된다.
Vertices와 Edges DataFrame의 기본 형태는 다음과 같다.
vv = spark.createDataFrame(vertices, verti_colums)
ee = spark.createDataFrame(edges, edge_colums)
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-===-=-=-=-=-=-==-=
# vertices = [(), (), ...]
# edges = [(), (), ...]
# verti_colums = [ column1, column2, ...]
# edges = [ column1, column2, ..., relationship]
# Ex)
# vertices = [(1, "Alpha", 20), (2, "Beta", 25)]
# edges = [(1,2,"friend")]
# verti_columns = ["id", "name", "age"]
# edge_colums = ["src", "dst", "relation"]
- GraphFrame 만들기
위 과정에서 만든 DataFrame을 활용한다.
from graphframe import *
g = GraphFrame(vv, ee)
- 만든 GraphFrame을 시각적으로 표현하기
matplotlib의 pyplot
을 사용한다.
edges_list에 들어갈'src', 'dst'
는 이전에 정의한거에 따라 바뀔 수 있다.
import networkx as nx
import matplotlib.pyplot as plt
def PlotGraph(edge_list):
Gplot=nx.Graph()
tmp = []
for row in edge_list.select('src','dst').take(g.edges.count()):
tmp.append((row['src'], row['dst']))
Gplot.add_edges_from(tmp)
#plt.subplot(121)
plt.figure(figsize=(12, 12))
nx.draw(Gplot, cmap = plt.get_cmap('jet'), node_size=20)
plt.show()
PlotGraph(g.edges)
- 만든 GraphFrame에서
BFS 알고리즘
적용하여 관계 찾아내기
pp = g.bfs(
fromExpr = <시작할 노드의 특정 값>, #ex : "id = 0"
toExpr = <도착할 노드의 특정 값>, #ex : "id = 1"
)
pp.show()
- 이 외, GraphFrames에서 사용할 수 있는 함수들
g.find() #ex : g.find("(parent)-[e]->(child)")
g.filter() #ex : g.filter("e.relation=='child'")
g.show()
pp.select()
pp.first()
- Pandas를 사용하여 Json을 DataFrame으로 만들고 데이터 확인 및 재정렬 하기
import pandas as pd
df.toPandas() # all check
df.toPandas().head() # 상위 5개만
df = df.orderBy("<컬럼명>", ascending=False) # DESC 정렬
참고 : https://docs.databricks.com/_static/notebooks/graphframes-user-guide-py.html
반응형