Initial re-upload of spice2x-24-08-24

This commit is contained in:
2024-08-28 11:10:34 -04:00
commit caa9e02285
1181 changed files with 380065 additions and 0 deletions

View File

@@ -0,0 +1,24 @@
This is free and unencumbered software released into the public domain.
Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a compiled
binary, for any purpose, commercial or non-commercial, and by any
means.
In jurisdictions that recognize copyright laws, the author or authors
of this software dedicate any and all copyright interest in the
software to the public domain. We make this dedication for the benefit
of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of
relinquishment in perpetuity of all present and future rights to this
software under copyright law.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
For more information, please refer to <http://unlicense.org/>

View File

@@ -0,0 +1,14 @@
from .connection import Connection
from .request import Request
from .analogs import *
from .buttons import *
from .card import *
from .coin import *
from .control import *
from .exceptions import *
from .iidx import *
from .info import *
from .keypads import *
from .lights import *
from .memory import *
from .touch import *

View File

@@ -0,0 +1,28 @@
from .connection import Connection
from .request import Request
def analogs_read(con: Connection):
res = con.request(Request("analogs", "read"))
return res.get_data()
def analogs_write(con: Connection, analog_state_list):
req = Request("analogs", "write")
for state in analog_state_list:
req.add_param(state)
con.request(req)
def analogs_write_reset(con: Connection, analog_names=None):
req = Request("analogs", "write_reset")
# reset all analogs
if not analog_names:
con.request(req)
return
# reset specified analogs
for analog_name in analog_names:
req.add_param(analog_name)
con.request(req)

View File

@@ -0,0 +1,28 @@
from .connection import Connection
from .request import Request
def buttons_read(con: Connection):
res = con.request(Request("buttons", "read"))
return res.get_data()
def buttons_write(con: Connection, button_state_list):
req = Request("buttons", "write")
for state in button_state_list:
req.add_param(state)
con.request(req)
def buttons_write_reset(con: Connection, button_names=None):
req = Request("buttons", "write_reset")
# reset all buttons
if not button_names:
con.request(req)
return
# reset specified buttons
for button_name in button_names:
req.add_param(button_name)
con.request(req)

View File

@@ -0,0 +1,9 @@
from .connection import Connection
from .request import Request
def card_insert(con: Connection, unit: int, card_id: str):
req = Request("card", "insert")
req.add_param(unit)
req.add_param(card_id)
con.request(req)

View File

@@ -0,0 +1,20 @@
from .connection import Connection
from .request import Request
def coin_get(con: Connection):
res = con.request(Request("coin", "get"))
return res.get_data()[0]
def coin_set(con: Connection, amount: int):
req = Request("coin", "set")
req.add_param(amount)
con.request(req)
def coin_insert(con: Connection, amount=1):
req = Request("coin", "insert")
if amount != 1:
req.add_param(amount)
con.request(req)

View File

@@ -0,0 +1,139 @@
import os
import socket
from .request import Request
from .response import Response
from .rc4 import rc4
from .exceptions import MalformedRequestException, APIError
class Connection:
""" Container for managing a single connection to the API server.
"""
def __init__(self, host: str, port: int, password: str):
"""Default constructor.
:param host: the host string to connect to
:param port: the port of the host
:param password: the connection password string
"""
self.host = host
self.port = port
self.password = password
self.socket = None
self.cipher = None
self.reconnect()
def reconnect(self, refresh_session=True):
"""Reconnect to the server.
This opens a new connection and closes the previous one, if existing.
"""
# close old socket
self.close()
# create new socket
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(3)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.socket.connect((self.host, self.port))
# cipher
self.change_password(self.password)
# refresh session
if refresh_session:
from .control import control_session_refresh
control_session_refresh(self)
def change_password(self, password):
"""Allows to change the password on the fly.
The cipher will be rebuilt.
"""
if len(password) > 0:
self.cipher = rc4(password.encode("UTF-8"))
else:
self.cipher = None
def close(self):
"""Close the active connection, if existing."""
# check if socket is existing
if self.socket:
# close and delete socket
self.socket.close()
self.socket = None
def request(self, request: Request):
"""Send a request to the server and receive the answer.
:param request: request object
:return: response object
"""
# check if disconnected
if not self.socket:
raise RuntimeError("No active connection.")
# build data
data = request.to_json().encode("UTF-8") + b"\x00"
if self.cipher:
data_list = list(data)
data_cipher = []
for b in data_list:
data_cipher.append(b ^ next(self.cipher))
data = bytes(data_cipher)
# send request
if os.name != 'nt':
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1)
self.socket.send(data)
# get answer
answer_data = []
while not len(answer_data) or answer_data[-1] != 0:
# receive data
if os.name != 'nt':
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1)
receive_data = self.socket.recv(4096)
# check length
if len(receive_data):
# check cipher
if self.cipher:
# add decrypted data
for b in receive_data:
answer_data.append(int(b ^ next(self.cipher)))
else:
# add plaintext
for b in receive_data:
answer_data.append(int(b))
else:
raise RuntimeError("Connection was closed.")
# check for empty response
if len(answer_data) <= 1:
# empty response means the JSON couldn't be parsed
raise MalformedRequestException()
# build response
response = Response(bytes(answer_data[:-1]).decode("UTF-8"))
if len(response.get_errors()):
raise APIError(response.get_errors())
# check ID
req_id = request.get_id()
res_id = response.get_id()
if req_id != res_id:
raise RuntimeError(f"Unexpected response ID: {res_id} (expected {req_id})")
# return response object
return response

View File

@@ -0,0 +1,51 @@
import random
from .connection import Connection
from .request import Request
def control_raise(con: Connection, signal: str):
req = Request("control", "raise")
req.add_param(signal)
con.request(req)
def control_exit(con: Connection, code=None):
req = Request("control", "exit")
if code:
req.add_param(code)
try:
con.request(req)
except RuntimeError:
pass # we expect the connection to get killed
def control_restart(con: Connection):
req = Request("control", "restart")
try:
con.request(req)
except RuntimeError:
pass # we expect the connection to get killed
def control_session_refresh(con: Connection):
res = con.request(Request("control", "session_refresh", req_id=random.randint(1, 2**64)))
# apply new password
password = res.get_data()[0]
con.change_password(password)
def control_shutdown(con: Connection):
req = Request("control", "shutdown")
try:
con.request(req)
except RuntimeError:
pass # we expect the connection to get killed
def control_reboot(con: Connection):
req = Request("control", "reboot")
try:
con.request(req)
except RuntimeError:
pass # we expect the connection to get killed

View File

@@ -0,0 +1,10 @@
class APIError(Exception):
def __init__(self, errors):
super().__init__("\r\n".join(errors))
class MalformedRequestException(Exception):
pass

View File

@@ -0,0 +1,18 @@
from .connection import Connection
from .request import Request
def iidx_ticker_get(con: Connection):
res = con.request(Request("iidx", "ticker_get"))
return res.get_data()
def iidx_ticker_set(con: Connection, text: str):
req = Request("iidx", "ticker_set")
req.add_param(text)
con.request(req)
def iidx_ticker_reset(con: Connection):
req = Request("iidx", "ticker_reset")
con.request(req)

View File

@@ -0,0 +1,17 @@
from .connection import Connection
from .request import Request
def info_avs(con: Connection):
res = con.request(Request("info", "avs"))
return res.get_data()[0]
def info_launcher(con: Connection):
res = con.request(Request("info", "launcher"))
return res.get_data()[0]
def info_memory(con: Connection):
res = con.request(Request("info", "memory"))
return res.get_data()[0]

View File

@@ -0,0 +1,24 @@
from .connection import Connection
from .request import Request
def keypads_write(con: Connection, keypad: int, input_values: str):
req = Request("keypads", "write")
req.add_param(keypad)
req.add_param(input_values)
con.request(req)
def keypads_set(con: Connection, keypad: int, input_values: str):
req = Request("keypads", "set")
req.add_param(keypad)
for value in input_values:
req.add_param(value)
con.request(req)
def keypads_get(con: Connection, keypad: int):
req = Request("keypads", "get")
req.add_param(keypad)
res = con.request(req)
return res.get_data()

View File

@@ -0,0 +1,28 @@
from .connection import Connection
from .request import Request
def lights_read(con: Connection):
res = con.request(Request("lights", "read"))
return res.get_data()
def lights_write(con: Connection, light_state_list):
req = Request("lights", "write")
for state in light_state_list:
req.add_param(state)
con.request(req)
def lights_write_reset(con: Connection, light_names=None):
req = Request("lights", "write_reset")
# reset all lights
if not light_names:
con.request(req)
return
# reset specified lights
for light_name in light_names:
req.add_param(light_name)
con.request(req)

View File

@@ -0,0 +1,31 @@
from .connection import Connection
from .request import Request
def memory_write(con: Connection, dll_name: str, data: str, offset: int):
req = Request("memory", "write")
req.add_param(dll_name)
req.add_param(data)
req.add_param(offset)
con.request(req)
def memory_read(con: Connection, dll_name: str, offset: int, size: int):
req = Request("memory", "read")
req.add_param(dll_name)
req.add_param(offset)
req.add_param(size)
res = con.request(req)
return res.get_data()[0]
def memory_signature(con: Connection, dll_name: str, signature: str,
replacement: str, offset: int, usage: int):
req = Request("memory", "signature")
req.add_param(dll_name)
req.add_param(signature)
req.add_param(replacement)
req.add_param(offset)
req.add_param(usage)
res = con.request(req)
return res.get_data()[0]

View File

@@ -0,0 +1,24 @@
def rc4_ksa(key):
n = len(key)
j = 0
s_box = list(range(256))
for i in range(256):
j = (j + s_box[i] + key[i % n]) % 256
s_box[i], s_box[j] = s_box[j], s_box[i]
return s_box
def rc4_prga(s_box):
i = 0
j = 0
while True:
i = (i + 1) % 256
j = (j + s_box[i]) % 256
s_box[i], s_box[j] = s_box[j], s_box[i]
yield s_box[(s_box[i] + s_box[j]) % 256]
def rc4(key):
return rc4_prga(rc4_ksa(key))

View File

@@ -0,0 +1,63 @@
import json
from threading import Lock
class Request:
# global ID pool
GLOBAL_ID = 1
GLOBAL_ID_LOCK = Lock()
def __init__(self, module: str, function: str, req_id=None):
# use global ID
with Request.GLOBAL_ID_LOCK:
if req_id is None:
# reset at max value
Request.GLOBAL_ID += 1
if Request.GLOBAL_ID >= 2 ** 64:
Request.GLOBAL_ID = 1
# get ID and increase by one
req_id = Request.GLOBAL_ID
else:
# carry over ID
Request.GLOBAL_ID = req_id
# remember ID
self._id = req_id
# build data dict
self.data = {
"id": req_id,
"module": module,
"function": function,
"params": []
}
@staticmethod
def from_json(request_json: str):
req = Request("", "", 0)
req.data = json.loads(request_json)
req._id = req.data["id"]
return req
def get_id(self):
return self._id
def to_json(self):
return json.dumps(
self.data,
ensure_ascii=False,
check_circular=False,
allow_nan=False,
indent=None,
separators=(",", ":"),
sort_keys=False
)
def add_param(self, param):
self.data["params"].append(param)

View File

@@ -0,0 +1,30 @@
import json
class Response:
def __init__(self, response_json: str):
self._res = json.loads(response_json)
self._id = self._res["id"]
self._errors = self._res["errors"]
self._data = self._res["data"]
def to_json(self):
return json.dumps(
self._res,
ensure_ascii=True,
check_circular=False,
allow_nan=False,
indent=2,
separators=(",", ": "),
sort_keys=False
)
def get_id(self):
return self._id
def get_errors(self):
return self._errors
def get_data(self):
return self._data

View File

@@ -0,0 +1,21 @@
from .connection import Connection
from .request import Request
def touch_read(con: Connection):
res = con.request(Request("touch", "read"))
return res.get_data()
def touch_write(con: Connection, touch_points):
req = Request("touch", "write")
for state in touch_points:
req.add_param(state)
con.request(req)
def touch_write_reset(con: Connection, touch_ids):
req = Request("touch", "write_reset")
for touch_id in touch_ids:
req.add_param(touch_id)
con.request(req)