This library is an ongoing effort towards bringing the data exchanging ability between Java/Scala and Python

Overview

PyJava

This library is an ongoing effort towards bringing the data exchanging ability between Java/Scala and Python. PyJava introduces Apache Arrow as the exchanging data format, this means we can avoid ser/der between Java/Scala and Python which can really speed up the communication efficiency than traditional way.

When you invoke python code in Java/Scala side, PyJava will start some python workers automatically and send the data to python worker, and once they are processed, send them back. The python workers are reused
by default.

The initial code in this lib is from Apache Spark.

Install

Setup python(>= 3.6) Env(Conda is recommended):

pip uninstall pyjava && pip install pyjava

Setup Java env(Maven is recommended):

For Scala 2.11/Spark 2.4.3

<dependency>
    <groupId>tech.mlsqlgroupId>
    <artifactId>pyjava-2.4_2.11artifactId>
    <version>0.3.2version>
dependency>

For Scala 2.12/Spark 3.1.1

<dependency>
    <groupId>tech.mlsqlgroupId>
    <artifactId>pyjava-3.0_2.12artifactId>
    <version>0.3.2version>
dependency>

Build Mannually

Install Build Tool:

pip install mlsql_plugin_tool

Build for Spark 3.1.1:

mlsql_plugin_tool spark311
mvn clean install -DskipTests -Pdisable-java8-doclint -Prelease-sign-artifacts

Build For Spark 2.4.3

mlsql_plugin_tool spark243
mvn clean install -DskipTests -Pdisable-java8-doclint -Prelease-sign-artifacts

Using python code snippet to process data in Java/Scala

With pyjava, you can run any python code in your Java/Scala application.

sourceEnconder.toRow(irow).copy() }.iterator // run the code and get the return result val javaConext = new JavaContext val commonTaskContext = new AppContextImpl(javaConext, batch) val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext) //f.copy(), copy function is required columnarBatchIter.flatMap { batch => batch.rowIterator.asScala }.foreach(f => println(f.copy())) javaConext.markComplete javaConext.close ">
val envs = new util.HashMap[String, String]()
// prepare python environment
envs.put(str(PythonConf.PYTHON_ENV), "source activate dev && export ARROW_PRE_0_15_IPC_FORMAT=1 ")

// describe the data which will be transfered to python 
val sourceSchema = StructType(Seq(StructField("value", StringType)))

val batch = new ArrowPythonRunner(
  Seq(ChainedPythonFunctions(Seq(PythonFunction(
    """
      |import pandas as pd
      |import numpy as np
      |
      |def process():
      |    for item in context.fetch_once_as_rows():
      |        item["value1"] = item["value"] + "_suffix"
      |        yield item
      |
      |context.build_result(process())
    """.stripMargin, envs, "python", "3.6")))), sourceSchema,
  "GMT", Map()
)

// prepare data
val sourceEnconder = RowEncoder.apply(sourceSchema).resolveAndBind()
val newIter = Seq(Row.fromSeq(Seq("a1")), Row.fromSeq(Seq("a2"))).map { irow =>
sourceEnconder.toRow(irow).copy()
}.iterator

// run the code and get the return result
val javaConext = new JavaContext
val commonTaskContext = new AppContextImpl(javaConext, batch)
val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext)

//f.copy(), copy function is required 
columnarBatchIter.flatMap { batch =>
  batch.rowIterator.asScala
}.foreach(f => println(f.copy()))
javaConext.markComplete
javaConext.close

Using python code snippet to process data in Spark

val enconder = RowEncoder.apply(struct).resolveAndBind() val envs = new util.HashMap[String, String]() envs.put(str(PythonConf.PYTHON_ENV), "source activate streamingpro-spark-2.4.x") val batch = new ArrowPythonRunner( Seq(ChainedPythonFunctions(Seq(PythonFunction( """ |import pandas as pd |import numpy as np |for item in data_manager.fetch_once(): | print(item) |df = pd.DataFrame({'AAA': [4, 5, 6, 7],'BBB': [10, 20, 30, 40],'CCC': [100, 50, -30, -50]}) |data_manager.set_output([[df['AAA'],df['BBB']]]) """.stripMargin, envs, "python", "3.6")))), struct, timezoneid, Map() ) val newIter = iter.map { irow => enconder.toRow(irow) } val commonTaskContext = new SparkContextImp(TaskContext.get(), batch) val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext) columnarBatchIter.flatMap { batch => batch.rowIterator.asScala.map(_.copy) } } val wow = SparkUtils.internalCreateDataFrame(session, abc, StructType(Seq(StructField("AAA", LongType), StructField("BBB", LongType))), false) wow.show() ">
val session = spark
import session.implicits._
val timezoneid = session.sessionState.conf.sessionLocalTimeZone
val df = session.createDataset[String](Seq("a1", "b1")).toDF("value")
val struct = df.schema
val abc = df.rdd.mapPartitions { iter =>
  val enconder = RowEncoder.apply(struct).resolveAndBind()
  val envs = new util.HashMap[String, String]()
  envs.put(str(PythonConf.PYTHON_ENV), "source activate streamingpro-spark-2.4.x")
  val batch = new ArrowPythonRunner(
    Seq(ChainedPythonFunctions(Seq(PythonFunction(
      """
        |import pandas as pd
        |import numpy as np
        |for item in data_manager.fetch_once():
        |    print(item)
        |df = pd.DataFrame({'AAA': [4, 5, 6, 7],'BBB': [10, 20, 30, 40],'CCC': [100, 50, -30, -50]})
        |data_manager.set_output([[df['AAA'],df['BBB']]])
      """.stripMargin, envs, "python", "3.6")))), struct,
    timezoneid, Map()
  )
  val newIter = iter.map { irow =>
    enconder.toRow(irow)
  }
  val commonTaskContext = new SparkContextImp(TaskContext.get(), batch)
  val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext)
  columnarBatchIter.flatMap { batch =>
    batch.rowIterator.asScala.map(_.copy)
  }
}

val wow = SparkUtils.internalCreateDataFrame(session, abc, StructType(Seq(StructField("AAA", LongType), StructField("BBB", LongType))), false)
wow.show()

Run Python Project

With Pyjava, you can tell the system where is the python project and which is then entrypoint, then you can run this project in Java/Scala.

"/tmp/data", "tempModelLocalPath" -> "/tmp/model" )) output.foreach(println) ">
import tech.mlsql.arrow.python.runner.PythonProjectRunner

val runner = new PythonProjectRunner("./pyjava/examples/pyproject1", Map())
val output = runner.run(Seq("bash", "-c", "source activate dev && python train.py"), Map(
  "tempDataLocalPath" -> "/tmp/data",
  "tempModelLocalPath" -> "/tmp/model"
))
output.foreach(println)

Example In MLSQL

None Interactive Mode:

!python env "PYTHON_ENV=source activate streamingpro-spark-2.4.x";
!python conf "schema=st(field(a,long),field(b,long))";

select 1 as a as table1;

!python on table1 '''

import pandas as pd
import numpy as np
for item in data_manager.fetch_once():
    print(item)
df = pd.DataFrame({'AAA': [4, 5, 6, 8],'BBB': [10, 20, 30, 40],'CCC': [100, 50, -30, -50]})
data_manager.set_output([[df['AAA'],df['BBB']]])

''' named mlsql_temp_table2;

select * from mlsql_temp_table2 as output; 

Interactive Mode:

!python start;

!python env "PYTHON_ENV=source activate streamingpro-spark-2.4.x";
!python env "schema=st(field(a,integer),field(b,integer))";


!python '''
import pandas as pd
import numpy as np
''';

!python  '''
for item in data_manager.fetch_once():
    print(item)
df = pd.DataFrame({'AAA': [4, 5, 6, 8],'BBB': [10, 20, 30, 40],'CCC': [100, 50, -30, -50]})
data_manager.set_output([[df['AAA'],df['BBB']]])
''';
!python close;

Using PyJava as Arrow Server/Client

Java Server side:

enconder.toRow(irow) }.iterator val javaConext = new JavaContext val commonTaskContext = new AppContextImpl(javaConext, null) val Array(_, host, port) = socketRunner.serveToStreamWithArrow(newIter, dataSchema, 10, commonTaskContext) println(s"${host}:${port}") Thread.currentThread().join() ">
val socketRunner = new SparkSocketRunner("wow", NetUtils.getHost, "Asia/Harbin")

val dataSchema = StructType(Seq(StructField("value", StringType)))
val enconder = RowEncoder.apply(dataSchema).resolveAndBind()
val newIter = Seq(Row.fromSeq(Seq("a1")), Row.fromSeq(Seq("a2"))).map { irow =>
  enconder.toRow(irow)
}.iterator
val javaConext = new JavaContext
val commonTaskContext = new AppContextImpl(javaConext, null)

val Array(_, host, port) = socketRunner.serveToStreamWithArrow(newIter, dataSchema, 10, commonTaskContext)
println(s"${host}:${port}")
Thread.currentThread().join()

Python Client side:

import os
import socket

from pyjava.serializers import \
    ArrowStreamPandasSerializer

out_ser = ArrowStreamPandasSerializer(None, True, True)

out_ser = ArrowStreamPandasSerializer("Asia/Harbin", False, None)
HOST = ""
PORT = -1
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
    sock.connect((HOST, PORT))
    buffer_size = int(os.environ.get("SPARK_BUFFER_SIZE", 65536))
    infile = os.fdopen(os.dup(sock.fileno()), "rb", buffer_size)
    outfile = os.fdopen(os.dup(sock.fileno()), "wb", buffer_size)
    kk = out_ser.load_stream(infile)
    for item in kk:
        print(item)

Python Server side:

import os

import pandas as pd

os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
from pyjava.api.serve import OnceServer

ddata = pd.DataFrame(data=[[1, 2, 3, 4], [2, 3, 4, 5]])

server = OnceServer("127.0.0.1", 11111, "Asia/Harbin")
server.bind()
server.serve([{'id': 9, 'label': 1}])

Java Client side:

println(enconder.fromRow(i.copy()))) javaConext.close ">
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import tech.mlsql.arrow.python.iapp.{AppContextImpl, JavaContext}
import tech.mlsql.arrow.python.runner.SparkSocketRunner
import tech.mlsql.common.utils.network.NetUtils

val enconder = RowEncoder.apply(StructType(Seq(StructField("a", LongType),StructField("b", LongType)))).resolveAndBind()
val socketRunner = new SparkSocketRunner("wow", NetUtils.getHost, "Asia/Harbin")
val javaConext = new JavaContext
val commonTaskContext = new AppContextImpl(javaConext, null)
val iter = socketRunner.readFromStreamWithArrow("127.0.0.1", 11111, commonTaskContext)
iter.foreach(i => println(enconder.fromRow(i.copy())))
javaConext.close

How to configure python worker runs in Docker (todo)

Owner
Byzer
Let data speak.
Byzer
Two predictive attributes (Speed and Angle) and one attribute target (Power)

Two predictive attributes (Speed and Angle) and one attribute target (Power). A container crane has the function of transporting containers from one point to another point. The difficulty of this tas

Astitva Veer Garg 1 Jan 11, 2022
decorator

Decorators for Humans The goal of the decorator module is to make it easy to define signature-preserving function decorators and decorator factories.

Michele Simionato 734 Dec 30, 2022
Something like Asteroids but not really, done in CircuitPython

CircuitPython Staroids Something like Asteroids, done in CircuitPython. Works with FunHouse, MacroPad, Pybadge, EdgeBadge, CLUE, and Pygamer. circuitp

Tod E. Kurt 14 May 31, 2022
奇遇淘客服务器端

奇遇淘客 APP 服务器端 警告 正在使用 v0.2.0 版本的用户,请尽快升级到 v0.2.1。 v0.2.0 版本的 Docker 镜像中包含了有问题的 aiohttp。 奇遇淘客代码库 奇遇淘客 iOS APP 奇遇淘客 Android APP 奇遇淘客文档 服务器端文档 Docker 使用

奇遇科技 92 Nov 09, 2022
Probably the best way to simulate block scopes in Python

This is a package, as it says on the tin, to emulate block scoping in Python, the lack of which being a clever design choice yet sometimes a trouble.

88 Oct 26, 2022
ESteg - A simple steganography program for python

ESteg A simple steganography program to embed the contents of a text file into a

Jithin Renji 1 Jan 02, 2022
This library is an ongoing effort towards bringing the data exchanging ability between Java/Scala and Python

PyJava This library is an ongoing effort towards bringing the data exchanging ability between Java/Scala and Python

Byzer 6 Oct 17, 2022
Here, I have discuss the three methods of list reversion. The three methods are built-in method, slicing method and position changing method.

Three-different-method-for-list-reversion Here, I have discuss the three methods of list reversion. The three methods are built-in method, slicing met

Sachin Vinayak Dabhade 4 Sep 24, 2021
Reverse the infix string. Note that while reversing the string you must interchange left and right parentheses

Reverse the infix string. Note that while reversing the string you must interchange left and right parentheses. Obtain the postfix expression of the infix expression Step 1.Reverse the postfix expres

Sazzad Hossen 1 Jan 04, 2022
This repository contains a lot of short scripting programs implemented both in Python (Flask) and TypeScript (NodeJS).

fast-scripts This repository contains a lot of short scripting programs implemented both in Python (Flask) and TypeScript (NodeJS). In python These wi

Nahum Maurice 3 Dec 10, 2022
Online HackerRank problem solving challenges

LinkedListHackerRank Online HackerRank problem solving challenges This challenge is part of a tutorial track by MyCodeSchool You are given the pointer

Sefineh Tesfa 1 Nov 21, 2021
HSPyLib is a Python library that will elevate your experience to another level.

HomeSetup Python Library - HSPyLib Your mature python application HSPyLib is a Python library that will elevate your experience to another level. It r

Hugo Saporetti Junior 4 Dec 14, 2022
MeerKAT radio telescope simulation package. Built to simulate multibeam antenna data.

MeerKATgen MeerKAT radio telescope simulation package. Designed with performance in mind and utilizes Just in time compile (JIT) and XLA backed vectro

Peter Ma 6 Jan 23, 2022
A Dungeon and Dragons Toolkit using Python

Pythons-Dungeons A Dungeon and Dragons Toolkit using Python Rules: -When you are commiting please don't delete parts of the code that are important -A

2 Oct 21, 2021
Pymon is like nodemon but it is for python,

Pymon is like nodemon but it is for python,

Swaraj Puppalwar 2 Jun 11, 2022
A multi-platform fuzzer for poking at userland binaries and servers

litefuzz A multi-platform fuzzer for poking at userland binaries and servers litefuzz intro why how it works what it does what it doesn't do support p

52 Nov 18, 2022
A python script that automatically joins a zoom meeting based on your timetable.

Zoom Automation A python script that automatically joins a zoom meeting based on your timetable. What does it do? It performs the following processes:

Shourya Gupta 3 Jan 01, 2022
A collection of daily usage utility scripts in python. Helps in automation of day to day repetitive tasks.

Kush's Utils Tool is my personal collection of scripts which is used to automated daily tasks. It is a evergrowing collection of scripts and will continue to evolve till the day I program. This is al

Kushagra 10 Jan 16, 2022
A simple and efficient computing package for Genshin Impact gacha analysis

GGanalysisLite计算包 这个版本的计算包追求计算速度,而GGanalysis包有着更多计算功能。 GGanalysisLite包通过卷积计算分布列,通过FFT和快速幂加速卷积计算。 测试玩家得到的排名值rank的数学意义是:与抽了同样数量五星的其他玩家相比,测试玩家花费的抽数大于等于比例

一棵平衡树 34 Nov 26, 2022
Viewer for NFO files

NFO Viewer NFO Viewer is a simple viewer for NFO files, which are "ASCII" art in the CP437 codepage. The advantages of using NFO Viewer instead of a t

Osmo Salomaa 114 Dec 29, 2022