본문 바로가기

Tech/Spark

pySpark & GraphFrames

Install

  • pyspark - graphframe Docker Image
  https://hub.docker.com/r/tomerlevi/jupyter-spark-graphframes

 

  • Docker Run
  docker run -p 8888:8888 -p 4040:4040 -v ~:/home/jovyan/workspace --name jupyter tomerlevi/jupyter-spark-graphframes

 


 

Use

  • 위 Docker Container가 실행된 후, Jupyter Lab으로 이동
  http://127.0.0.1:8888/lab

 

  • pyspark를 사용하기 위해, Session을 생성해야함.
  #from pyspark import SparkConf, SparkContext
  from pyspark.sql import SparkSession

  spark = SparkSession.builder \
      .master("local") \
      .appName("Spark") \
      .config("spark.some.config.option" , "some-value") \
      .config("spark.sql.repl.eagerEval.enabled", True) \
      .getOrCreate()

 

  • 나는 파일로 존재하는Json Data를 가져와 Graphframe을 통해 graph를 생성하려 함.
with open ("/home/jovyan/work/response.json", 'r') as f:
    fread = f.read()

context = json.loads(fread)

with open ("/home/jovyan/work/result.json", 'w') as f:
    f.write(json.dumps(...))


df = spark.read.json("/home/jovyan/work/result.json")

 

 

주의! :

파일형태의 Json을 read하는 과정에서 오류가 발생 할 수 있다.

그때는 Json을 한줄 형태로 바꿔보자.

 

 

 

 

 

  • GraphFrames에서 사용하는 노드, 연관 관계Vertices와 Edges로 구분된다.
    Vertices와 Edges DataFrame의 기본 형태는 다음과 같다.
  vv = spark.createDataFrame(vertices, verti_colums)
  ee = spark.createDataFrame(edges, edge_colums)

  # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-===-=-=-=-=-=-==-=
  # vertices = [(), (), ...]
  # edges = [(), (), ...]
  # verti_colums = [ column1, column2, ...]
  # edges = [ column1, column2, ...,  relationship]


  # Ex)
  # vertices = [(1, "Alpha", 20), (2, "Beta", 25)]
  # edges = [(1,2,"friend")]
  # verti_columns = ["id", "name", "age"]
  # edge_colums = ["src", "dst", "relation"]

 

  • GraphFrame 만들기
    위 과정에서 만든 DataFrame을 활용한다.
  from graphframe import *

  g = GraphFrame(vv, ee)

 

  • 만든 GraphFrame을 시각적으로 표현하기
    matplotlib의 pyplot을 사용한다.
    edges_list에 들어갈 'src', 'dst'는 이전에 정의한거에 따라 바뀔 수 있다.
  import networkx as nx
  import matplotlib.pyplot as plt

  def PlotGraph(edge_list):
      Gplot=nx.Graph()
      tmp = []
      for row in edge_list.select('src','dst').take(g.edges.count()):
          tmp.append((row['src'], row['dst']))
      Gplot.add_edges_from(tmp)


      #plt.subplot(121)
      plt.figure(figsize=(12, 12))
      nx.draw(Gplot, cmap = plt.get_cmap('jet'), node_size=20)
      plt.show()
  PlotGraph(g.edges)

 

  • 만든 GraphFrame에서 BFS 알고리즘 적용하여 관계 찾아내기
  pp = g.bfs(
      fromExpr = <시작할 노드의 특정 값>, #ex : "id = 0"
      toExpr = <도착할 노드의 특정 값>, #ex : "id = 1"
      )
  pp.show()

 

  • 이 외, GraphFrames에서 사용할 수 있는 함수들
  g.find()     #ex : g.find("(parent)-[e]->(child)")
  g.filter()  #ex : g.filter("e.relation=='child'")
  g.show()

  pp.select()
  pp.first()

 

  • Pandas를 사용하여 Json을 DataFrame으로 만들고 데이터 확인 및 재정렬 하기
  import pandas as pd

  df.toPandas()  # all check
  df.toPandas().head() # 상위 5개만
  df = df.orderBy("<컬럼명>", ascending=False) # DESC 정렬

참고 : https://docs.databricks.com/_static/notebooks/graphframes-user-guide-py.html

반응형