Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ up:
docker compose up -d

test:
go test -timeout=30m ./...
go test -timeout=10m ./...

test_v:
go test -v ./...

test_short:
go test ./... -short
go test -short ./...

test_race:
go test ./... -short -race
go test -short -race ./...

test_stress:

Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ services:
mysql:
image: mysql:8.0
restart: unless-stopped
command: [ "--max_connections=50000" ]
command: [ "--max_connections=5000" ]
ports:
- 3306:3306
environment:
Expand All @@ -15,7 +15,7 @@ services:
postgres:
image: postgres:15.3
restart: unless-stopped
command: postgres -c 'max_connections=50000'
command: postgres -c 'max_connections=5000'
ports:
- 5432:5432
environment:
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ toolchain go1.24.1
require (
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2
github.com/go-sql-driver/mysql v1.4.1
github.com/jackc/pgx/v5 v5.7.2
github.com/jackc/pgx/v5 v5.7.5
github.com/lib/pq v1.10.9
github.com/oklog/ulid v1.3.1
github.com/pkg/errors v0.9.1
Expand All @@ -27,9 +27,9 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/sony/gobreaker v1.0.0 // indirect
golang.org/x/crypto v0.35.0 // indirect
golang.org/x/sync v0.11.0 // indirect
golang.org/x/text v0.22.0 // indirect
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/sync v0.13.0 // indirect
golang.org/x/text v0.24.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI=
github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
github.com/jackc/pgx/v5 v5.7.5 h1:JHGfMnQY+IEtGM63d+NGMjoRpysB2JBwDr5fsngwmJs=
github.com/jackc/pgx/v5 v5.7.5/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
Expand All @@ -48,16 +48,16 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs=
golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ=
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=
google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ func TestDefaultPostgreSQLSchema_planner_mis_estimate_regression(t *testing.T) {
require.NoError(t, err)

messagesCount := 100_000
if testing.Short() {
if testing.Short() || os.Getenv("CI") == "true" {
messagesCount = 1_000
}
tests.AddSimpleMessagesParallel(t, messagesCount, pub, topicName, 50)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/queue_schema_adapter_postgresql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestPostgreSQLQueueSchemaAdapter(t *testing.T) {
case msg := <-messages:
receivedMessages = append(receivedMessages, msg)
msg.Ack()
case <-time.After(100 * time.Millisecond):
case <-time.After(5 * time.Second):
t.Errorf("expected to receive message")
break
}
Expand Down
44 changes: 40 additions & 4 deletions pkg/sql/schema_adapter_postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (s DefaultPostgreSQLSchema) SelectQuery(params SelectQueryParams) (Query, e
` + nextOffsetQuery.Query + `
)

SELECT "offset", transaction_id::text, uuid, payload, metadata FROM ` + s.MessagesTable(params.Topic) + `
SELECT "offset", transaction_id, uuid, payload, metadata FROM ` + s.MessagesTable(params.Topic) + `

WHERE
(
Expand Down Expand Up @@ -269,23 +269,59 @@ func (x *XID8) Scan(src interface{}) error {
return errors.New("cannot scan nil value into XID8")
}

// We want to support scanning from various types (different drivers, like lib/pq, pgx, etc.)
switch v := src.(type) {
case int64:
if v < 0 {
return fmt.Errorf("cannot convert negative int64 %d to XID8", v)
}
*x = XID8(uint64(v))
return nil

case uint64:
*x = XID8(v)
return nil

case int32:
if v < 0 {
return fmt.Errorf("cannot convert negative int32 %d to XID8", v)
}
*x = XID8(uint64(v))
return nil

case uint32:
*x = XID8(v)
return nil

// pgx
case string:
if v == "" {
*x = 0
return nil
}
val, err := strconv.ParseUint(v, 10, 64)
if err != nil {
return err
return fmt.Errorf("cannot parse string %q as uint64: %w", v, err)
}
*x = XID8(val)
return nil

// lib/pq
case []byte:
if len(v) == 0 {
*x = 0
return nil
}

val, err := strconv.ParseUint(string(v), 10, 64)
if err != nil {
return err
return fmt.Errorf("cannot parse bytes %q as uint64: %w", string(v), err)
}
*x = XID8(val)
return nil

default:
return errors.New("unsupported Scan value type for XID8")
return fmt.Errorf("cannot scan %T into XID8", src)
}
}

Expand Down
Loading