186 lines
5.5 KiB
Go
186 lines
5.5 KiB
Go
// Package ossutil 提供 gateway 侧对 OSS 的轻量操作封装。
|
||
//
|
||
// 涉及 STS 凭证的操作建议在请求粒度调用即可(不要跨请求复用 STS,
|
||
// STS token 过期会导致后续调用 403)。每次调用都换一次凭证,延迟成本可接受。
|
||
package ossutil
|
||
|
||
import (
|
||
"fmt"
|
||
"net/url"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/aliyun/aliyun-oss-go-sdk/oss"
|
||
"github.com/aliyun/credentials-go/credentials"
|
||
"github.com/topfans/backend/gateway/config"
|
||
"github.com/topfans/backend/pkg/logger"
|
||
"go.uber.org/zap"
|
||
)
|
||
|
||
// ObjectInfo 描述 OSS 对象的关键元数据,供调用方做"按时间过滤孤儿"等判断。
|
||
type ObjectInfo struct {
|
||
Key string
|
||
Size int64
|
||
LastModified time.Time
|
||
}
|
||
|
||
// ExtractKeyFromPublicURL 从完整 OSS 公开 URL 提取对象 key。
|
||
// 仅接受本 bucket 域名(https://<bucket>.oss-<region>.aliyuncs.com/<key>),
|
||
// 防止调用方写入指向其他 bucket / 公网图片的 URL。
|
||
// 任何 query string(缓存击穿用)都会被忽略,只取 path 里的 key。
|
||
func ExtractKeyFromPublicURL(cfg *config.OSSConfig, rawURL string) (string, error) {
|
||
if cfg == nil {
|
||
return "", fmt.Errorf("ossutil: cfg 不能为空")
|
||
}
|
||
if rawURL == "" {
|
||
return "", fmt.Errorf("URL 不能为空")
|
||
}
|
||
u, err := url.Parse(rawURL)
|
||
if err != nil {
|
||
return "", fmt.Errorf("URL 解析失败: %w", err)
|
||
}
|
||
if u.Scheme != "https" && u.Scheme != "http" {
|
||
return "", fmt.Errorf("仅支持 http(s) URL")
|
||
}
|
||
expectedHost := fmt.Sprintf("%s.oss-%s.aliyuncs.com", cfg.BucketName, cfg.Region)
|
||
if u.Host != expectedHost {
|
||
return "", fmt.Errorf("URL host 必须是本 bucket 域名 %s", expectedHost)
|
||
}
|
||
key := strings.TrimPrefix(u.Path, "/")
|
||
if key == "" {
|
||
return "", fmt.Errorf("URL 必须包含对象 key")
|
||
}
|
||
return key, nil
|
||
}
|
||
|
||
// newClient 创建一个使用 STS 临时凭证的 OSS 客户端。
|
||
// 调用方负责 client 生命周期;本包内 helper 各自建一个,不跨调用复用。
|
||
func newClient(cfg *config.OSSConfig) (*oss.Client, *oss.Bucket, func(), error) {
|
||
credConfig := new(credentials.Config).
|
||
SetType("ram_role_arn").
|
||
SetAccessKeyId(cfg.AccessKeyID).
|
||
SetAccessKeySecret(cfg.AccessKeySecret).
|
||
SetRoleArn(cfg.RoleArn).
|
||
SetRoleSessionName("topfans-ossutil").
|
||
SetPolicy("").
|
||
SetRoleSessionExpiration(cfg.TokenExpireTime)
|
||
|
||
provider, err := credentials.NewCredential(credConfig)
|
||
if err != nil {
|
||
return nil, nil, nil, fmt.Errorf("创建凭证提供器失败: %w", err)
|
||
}
|
||
cred, err := provider.GetCredential()
|
||
if err != nil {
|
||
return nil, nil, nil, fmt.Errorf("获取临时凭证失败: %w", err)
|
||
}
|
||
|
||
endpoint := fmt.Sprintf("https://oss-%s.aliyuncs.com", cfg.Region)
|
||
client, err := oss.New(endpoint, *cred.AccessKeyId, *cred.AccessKeySecret,
|
||
oss.SecurityToken(*cred.SecurityToken))
|
||
if err != nil {
|
||
return nil, nil, nil, fmt.Errorf("创建OSS客户端失败: %w", err)
|
||
}
|
||
bucket, err := client.Bucket(cfg.BucketName)
|
||
if err != nil {
|
||
return nil, nil, nil, fmt.Errorf("获取Bucket失败: %w", err)
|
||
}
|
||
|
||
cleanup := func() {
|
||
// OSS SDK 当前没有需要显式关闭的连接;留作未来扩展
|
||
}
|
||
return client, bucket, cleanup, nil
|
||
}
|
||
|
||
// Head 检查指定 key 的对象是否存在。
|
||
// 返回 (true, nil) 表示存在;(false, nil) 表示不存在但调用成功;
|
||
// 返回 (_, err) 表示调用本身失败(网络/权限等)。
|
||
func Head(cfg *config.OSSConfig, key string) (bool, error) {
|
||
if cfg == nil {
|
||
return false, fmt.Errorf("ossutil: cfg 不能为空")
|
||
}
|
||
if key == "" {
|
||
return false, fmt.Errorf("ossutil: key 不能为空")
|
||
}
|
||
_, bucket, cleanup, err := newClient(cfg)
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
defer cleanup()
|
||
|
||
exist, err := bucket.IsObjectExist(key)
|
||
if err != nil {
|
||
return false, fmt.Errorf("OSS HEAD 失败: %w", err)
|
||
}
|
||
return exist, nil
|
||
}
|
||
|
||
// Delete 删除指定 key 的对象。
|
||
func Delete(cfg *config.OSSConfig, key string) error {
|
||
if cfg == nil {
|
||
return fmt.Errorf("ossutil: cfg 不能为空")
|
||
}
|
||
if key == "" {
|
||
return fmt.Errorf("ossutil: key 不能为空")
|
||
}
|
||
_, bucket, cleanup, err := newClient(cfg)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer cleanup()
|
||
|
||
if err := bucket.DeleteObject(key); err != nil {
|
||
logger.Logger.Warn("OSS DeleteObject 失败", zap.String("key", key), zap.Error(err))
|
||
return fmt.Errorf("OSS DELETE 失败: %w", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// List 列出指定前缀下的所有对象元数据(不含公共前缀本身)。
|
||
// maxKeys 传 <=0 时使用 OSS 默认分页行为,每次最多 1000 个,内部循环拉完。
|
||
func List(cfg *config.OSSConfig, prefix string, maxKeys int) ([]ObjectInfo, error) {
|
||
if cfg == nil {
|
||
return nil, fmt.Errorf("ossutil: cfg 不能为空")
|
||
}
|
||
_, bucket, cleanup, err := newClient(cfg)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer cleanup()
|
||
|
||
var objs []ObjectInfo
|
||
var continuationToken string
|
||
for {
|
||
opts := []oss.Option{}
|
||
if prefix != "" {
|
||
opts = append(opts, oss.Prefix(prefix))
|
||
}
|
||
if continuationToken != "" {
|
||
opts = append(opts, oss.ContinuationToken(continuationToken))
|
||
}
|
||
if maxKeys > 0 {
|
||
// 限制单次返回数量,OSS 单次最大 1000
|
||
if maxKeys > 1000 {
|
||
maxKeys = 1000
|
||
}
|
||
opts = append(opts, oss.MaxKeys(maxKeys))
|
||
}
|
||
|
||
result, err := bucket.ListObjects(opts...)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("OSS ListObjects 失败: %w", err)
|
||
}
|
||
for _, obj := range result.Objects {
|
||
objs = append(objs, ObjectInfo{
|
||
Key: obj.Key,
|
||
Size: obj.Size,
|
||
LastModified: obj.LastModified,
|
||
})
|
||
}
|
||
if !result.IsTruncated {
|
||
break
|
||
}
|
||
continuationToken = result.NextMarker
|
||
}
|
||
return objs, nil
|
||
}
|