pushx.providers.serverchan3 源代码
import json
import logging
from typing import Optional
import httpx
from pydantic import Field, AliasChoices
from pushx.provider import (
ProviderMetadata,
BasePushProvider,
BaseProviderParams,
PushResult,
)
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
# Metadata
# noinspection SpellCheckingInspection
[文档]
class NotifyParams(BaseProviderParams):
"""Notify 所需参数"""
title: str = Field(..., validation_alias=AliasChoices("title", "text"))
"""通知的标题"""
desp: str = Field(None, validation_alias=AliasChoices("desp", "content", "message"))
"""通知的内容,支持 Markdown"""
tags: str = None
"""通知的 tags"""
short: str = None
"""通知的略缩"""
# noinspection SpellCheckingInspection
[文档]
class NotifierParams(BaseProviderParams):
"""Notifier 所需参数"""
sendkey: str
"""ServerChan3 的 SendKey"""
uid: int
"""ServerChan3 的 UID"""
__provider_meta__ = ProviderMetadata(
name="ServerChan3",
class_name="ServerChan3",
description="ServerChan3 Provider",
notifier_params=NotifierParams,
notify_params=NotifyParams,
extra={},
)
[文档]
class ServerChan3(BasePushProvider):
def _set_notifier_params(self, params: Optional[NotifierParams] = None, **kwargs):
if params is None:
self._notifier_params = NotifierParams(**kwargs)
elif kwargs:
raise ValueError(
"You cannot pass NotifierParams objects and keyword arguments at the same time"
)
else:
self._notifier_params = params
def _notify(self, params: Optional[NotifyParams] = None, **kwargs) -> PushResult:
if params is None:
notify_params = NotifyParams(**kwargs)
elif kwargs:
raise ValueError(
"You cannot pass in NotifyParams objects and keyword arguments at the same time"
)
else:
notify_params = params
response = httpx.post(
f"https://{self._notifier_params.uid}.push.ft07.com/send/{self._notifier_params.sendkey}.send",
json=json.loads(notify_params.model_dump_json()),
)
try:
if json.loads(response.text)["code"] == 0:
return PushResult(success=True, code=200)
else:
logger.error(f"ServerChan3 Push error, detail:{response.text}")
return PushResult(
success=False,
code=500,
msg="An unexpected situation occurred, please refer to the response in data",
data=response.text,
)
except Exception as e:
logger.error(
f"ServerChan3 Push error, detail:{e}, response detail: {response.text}"
)
return PushResult(
success=False,
code=500,
msg="An unexpected situation occurred, please refer to the response in data",
data=response.text,
)